From bd33b0e77381c47b27873e106fea49d76b5c8cc9 Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Sat, 2 Nov 2024 22:04:08 +0000 Subject: [PATCH 01/40] initial azure_logs_ingestion implementation Signed-off-by: Jed Laundry --- .github/workflows/semantic.yml | 1 + Cargo.toml | 2 + src/sinks/azure_logs_ingestion/config.rs | 163 ++++++++++++++++++ src/sinks/azure_logs_ingestion/mod.rs | 13 ++ src/sinks/azure_logs_ingestion/service.rs | 192 ++++++++++++++++++++++ src/sinks/azure_logs_ingestion/sink.rs | 151 +++++++++++++++++ src/sinks/mod.rs | 2 + 7 files changed, 524 insertions(+) create mode 100644 src/sinks/azure_logs_ingestion/config.rs create mode 100644 src/sinks/azure_logs_ingestion/mod.rs create mode 100644 src/sinks/azure_logs_ingestion/service.rs create mode 100644 src/sinks/azure_logs_ingestion/sink.rs diff --git a/.github/workflows/semantic.yml b/.github/workflows/semantic.yml index 1b8ae3c0021c8..87837d0748054 100644 --- a/.github/workflows/semantic.yml +++ b/.github/workflows/semantic.yml @@ -204,6 +204,7 @@ jobs: aws_sqs sink axiom sink azure_blob sink + azure_logs_ingestion sink azure_monitor_logs sink blackhole sink clickhouse sink diff --git a/Cargo.toml b/Cargo.toml index ab9e6f0d3b2d2..01d6df1a506ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -782,6 +782,7 @@ sinks-logs = [ "sinks-aws_sqs", "sinks-axiom", "sinks-azure_blob", + "sinks-azure_logs_ingestion", "sinks-azure_monitor_logs", "sinks-blackhole", "sinks-chronicle", @@ -848,6 +849,7 @@ sinks-aws_sqs = ["aws-core", "dep:aws-sdk-sqs"] sinks-aws_sns = ["aws-core", "dep:aws-sdk-sns"] sinks-axiom = ["sinks-http"] sinks-azure_blob = ["dep:azure_core", "dep:azure_identity", "dep:azure_storage", "dep:azure_storage_blobs"] +sinks-azure_logs_ingestion = ["dep:azure_core", "dep:azure_identity"] sinks-azure_monitor_logs = [] sinks-blackhole = [] sinks-chronicle = [] diff --git a/src/sinks/azure_logs_ingestion/config.rs b/src/sinks/azure_logs_ingestion/config.rs new file mode 100644 index 0000000000000..5c1249363fa41 --- /dev/null +++ b/src/sinks/azure_logs_ingestion/config.rs @@ -0,0 +1,163 @@ + +use std::sync::Arc; + +use azure_core::auth::TokenCredential; +use vector_lib::configurable::configurable_component; +use vector_lib::schema; +use vrl::value::Kind; + +use crate::{ + http::{get_http_scheme_from_uri, HttpClient}, + sinks::{ + prelude::*, + util::{http::HttpStatusRetryLogic, RealtimeSizeBasedDefaultBatchSettings, UriSerde}, + }, +}; + +use super::{ + service::{AzureLogsIngestionResponse, AzureLogsIngestionService}, + sink::AzureLogsIngestionSink, +}; + +/// Max number of bytes in request body +const MAX_BATCH_SIZE: usize = 30 * 1024 * 1024; + +// Log Ingestion API version +// const API_VERSION: &str = "2023-01-01"; + +/// Configuration for the `azure_logs_ingestion` sink. +#[configurable_component(sink( + "azure_logs_ingestion", + "Publish log events to the Azure Logs Ingestion API." +))] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub struct AzureLogsIngestionConfig { + /// The [Data collection endpoint URI][endpoint] associated with the Log Analytics workspace. + /// + /// [endpoint]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview + #[configurable(metadata(docs::examples = "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com"))] + pub endpoint: String, + + /// The [Data collection rule][dcr] for the Data collection endpoint. + /// + /// [dcr]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview + #[configurable(metadata(docs::examples = "dcr-000a00a000a00000a000000aa000a0aa"))] + pub dcr: String, + + /// The [Stream name][stream_name] for the Data collection rule. + /// + /// [stream_name]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview + #[configurable(metadata(docs::examples = "Custom-MyTable"))] + pub stream_name: String, + + #[configurable(derived)] + #[serde(default, skip_serializing_if = "crate::serde::is_default")] + pub encoding: Transformer, + + #[configurable(derived)] + #[serde(default)] + pub batch: BatchConfig, + + #[configurable(derived)] + #[serde(default)] + pub request: TowerRequestConfig, + + #[configurable(derived)] + pub tls: Option, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::is_default" + )] + pub acknowledgements: AcknowledgementsConfig, +} + +impl Default for AzureLogsIngestionConfig { + fn default() -> Self { + Self { + endpoint: Default::default(), + dcr: Default::default(), + stream_name: Default::default(), + encoding: Default::default(), + batch: Default::default(), + request: Default::default(), + tls: None, + acknowledgements: Default::default(), + } + } +} + +impl AzureLogsIngestionConfig { + + pub(super) async fn build_inner( + &self, + cx: SinkContext, + endpoint: UriSerde, + ) -> crate::Result<(VectorSink, Healthcheck)> { + let endpoint = endpoint.with_default_parts().uri; + let protocol = get_http_scheme_from_uri(&endpoint).to_string(); + + let batch_settings = self + .batch + .validate()? + .limit_max_bytes(MAX_BATCH_SIZE)? + .into_batcher_settings()?; + + // TODO will need to change this as part of upstream 0.20.0 + // https://github.com/Azure/azure-sdk-for-rust/blob/main/sdk/identity/azure_identity/CHANGELOG.md + let credential: Arc = azure_identity::create_credential()?; + + let tls_settings = TlsSettings::from_options(&self.tls)?; + let client = HttpClient::new(Some(tls_settings), &cx.proxy)?; + + let service = AzureLogsIngestionService::new( + client, + endpoint, + credential, + )?; + let healthcheck = service.healthcheck(); + + let retry_logic = + HttpStatusRetryLogic::new(|res: &AzureLogsIngestionResponse| res.http_status); + let request_settings = self.request.into_settings(); + let service = ServiceBuilder::new() + .settings(request_settings, retry_logic) + .service(service); + + let sink = AzureLogsIngestionSink::new( + batch_settings, + self.encoding.clone(), + service, + protocol, + ); + + Ok((VectorSink::from_event_streamsink(sink), healthcheck)) + } +} + +impl_generate_config_from_default!(AzureLogsIngestionConfig); + +#[async_trait::async_trait] +#[typetag::serde(name = "azure_logs_ingestion")] +impl SinkConfig for AzureLogsIngestionConfig { + async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + // https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com/dataCollectionRules/dcr-000a00a000a00000a000000aa000a0aa/streams/Custom-MyTable?api-version=2023-01-01 + // let endpoint = format!("{}/dataCollectionRules/{}/streams/{}", self.endpoint, self.dcr, self.stream_name).parse()?; + let endpoint = self.endpoint.parse()?; + self.build_inner(cx, endpoint).await + } + + fn input(&self) -> Input { + let requirements = + schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp()); + + Input::log().with_schema_requirement(requirements) + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} diff --git a/src/sinks/azure_logs_ingestion/mod.rs b/src/sinks/azure_logs_ingestion/mod.rs new file mode 100644 index 0000000000000..dddcfc6fa586d --- /dev/null +++ b/src/sinks/azure_logs_ingestion/mod.rs @@ -0,0 +1,13 @@ +//! The Azure Logs Ingestion [`vector_lib::sink::VectorSink`] +//! +//! This module contains the [`vector_lib::sink::VectorSink`] instance that is responsible for +//! taking a stream of [`vector_lib::event::Event`] instances and forwarding them to the Azure +//! Logs Ingestion API. + +mod config; +mod service; +mod sink; +// #[cfg(test)] +// mod tests; + +pub use config::AzureLogsIngestionConfig; diff --git a/src/sinks/azure_logs_ingestion/service.rs b/src/sinks/azure_logs_ingestion/service.rs new file mode 100644 index 0000000000000..9a7d99ecad6d8 --- /dev/null +++ b/src/sinks/azure_logs_ingestion/service.rs @@ -0,0 +1,192 @@ +use std::sync::LazyLock; +use std::task::{Context, Poll}; +use std::sync::Arc; +use futures::executor; + +use azure_core::auth::TokenCredential; + +use bytes::Bytes; +use http::{ + header::{self, HeaderMap}, + HeaderValue, Request, StatusCode, Uri, +}; +use hyper::Body; +use tracing::Instrument; + +use crate::{http::HttpClient, sinks::prelude::*}; + +// JSON content type of logs +const CONTENT_TYPE: &str = "application/json"; + +static CONTENT_TYPE_VALUE: LazyLock = + LazyLock::new(|| HeaderValue::from_static(CONTENT_TYPE)); +// static X_MS_CLIENT_REQUEST_ID_HEADER: LazyLock = +// LazyLock::new(|| HeaderName::from_static("x-ms-client-request-id")); + +#[derive(Debug, Clone)] +pub struct AzureLogsIngestionRequest { + pub body: Bytes, + pub finalizers: EventFinalizers, + pub metadata: RequestMetadata, +} + +impl MetaDescriptive for AzureLogsIngestionRequest { + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata + } +} + +impl Finalizable for AzureLogsIngestionRequest { + fn take_finalizers(&mut self) -> EventFinalizers { + self.finalizers.take_finalizers() + } +} + +pub struct AzureLogsIngestionResponse { + pub http_status: StatusCode, + pub events_byte_size: GroupedCountByteSize, + pub raw_byte_size: usize, +} + +impl DriverResponse for AzureLogsIngestionResponse { + fn event_status(&self) -> EventStatus { + match self.http_status.is_success() { + true => EventStatus::Delivered, + false => EventStatus::Rejected, + } + } + + fn events_sent(&self) -> &GroupedCountByteSize { + &self.events_byte_size + } + + fn bytes_sent(&self) -> Option { + Some(self.raw_byte_size) + } +} + +/// `AzureLogsIngestionService` is a `Tower` service used to send logs to Azure. +#[derive(Debug, Clone)] +pub struct AzureLogsIngestionService { + client: HttpClient, + endpoint: Uri, + credential: Arc, + default_headers: HeaderMap, +} + +impl AzureLogsIngestionService { + /// Creates a new `AzureLogsIngestionService`. + pub fn new( + client: HttpClient, + endpoint: Uri, + credential: Arc, + ) -> crate::Result { + // let mut parts = endpoint.into_parts(); + // parts.path_and_query = Some( + // format!("a9ee8e5b-ed0e-4980-9b9c-15e1f939db7f?api-version={API_VERSION}") + // .parse() + // .expect("query should never fail to parse"), + // ); + // let endpoint = Uri::from_parts(parts)?; + + let default_headers = { + let mut headers = HeaderMap::new(); + + headers.insert(header::CONTENT_TYPE, CONTENT_TYPE_VALUE.clone()); + headers + }; + + Ok(Self { + client, + endpoint, + credential, + default_headers, + }) + } + + fn build_request(&self, body: Bytes) -> crate::Result> { + let mut request = Request::post(&self.endpoint).body(Body::from(body))?; + + // TODO: make this an option, for soverign clouds + let access_token = executor::block_on(self.credential + .get_token(&["https://monitor.azure.com/.default"])) + .expect("failed to get access token from credential"); + + let bearer = format!("Bearer {}", access_token.token.secret()); + + *request.headers_mut() = self.default_headers.clone(); + request + .headers_mut() + .insert( + header::AUTHORIZATION, + HeaderValue::from_str(&bearer).unwrap() + ); + + Ok(request) + } + + pub fn healthcheck(&self) -> Healthcheck { + let mut client = self.client.clone(); + let request = self.build_request(Bytes::from("[]")); + Box::pin(async move { + let request = request?; + let res = client.call(request).in_current_span().await?; + + if res.status().is_server_error() { + return Err("Server returned a server error".into()); + } + + if res.status() == StatusCode::FORBIDDEN { + return Err("The service failed to authenticate the request. Verify that the workspace ID and connection key are valid".into()); + } + + if res.status() == StatusCode::NOT_FOUND { + return Err( + "Either the URL provided is incorrect, or the request is too large".into(), + ); + } + + if res.status() == StatusCode::BAD_REQUEST { + return Err("The workspace has been closed or the request was invalid".into()); + } + + Ok(()) + }) + } +} + +impl Service for AzureLogsIngestionService { + type Response = AzureLogsIngestionResponse; + type Error = crate::Error; + type Future = BoxFuture<'static, Result>; + + // Emission of Error internal event is handled upstream by the caller. + fn poll_ready(&mut self, _cx: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + // Emission of Error internal event is handled upstream by the caller. + fn call(&mut self, request: AzureLogsIngestionRequest) -> Self::Future { + let mut client = self.client.clone(); + let http_request = self.build_request(request.body); + Box::pin(async move { + let http_request = http_request?; + let response = client.call(http_request).in_current_span().await?; + let response_status = response.status(); + // let body_bytes: Bytes = hyper::body::to_bytes(response.into_body()).await.unwrap(); + // let body_string: String = String::from_utf8(body_bytes.to_vec()).unwrap(); + // println!("response: {}", body_string); + Ok(AzureLogsIngestionResponse { + http_status: response_status, + raw_byte_size: request.metadata.request_encoded_size(), + events_byte_size: request + .metadata + .into_events_estimated_json_encoded_byte_size(), + }) + }) + } +} diff --git a/src/sinks/azure_logs_ingestion/sink.rs b/src/sinks/azure_logs_ingestion/sink.rs new file mode 100644 index 0000000000000..e86e1d27b67a1 --- /dev/null +++ b/src/sinks/azure_logs_ingestion/sink.rs @@ -0,0 +1,151 @@ +use std::{fmt::Debug, io}; + +use bytes::Bytes; +use vector_lib::codecs::{encoding::Framer, CharacterDelimitedEncoder, JsonSerializerConfig}; + +use crate::sinks::prelude::*; + +use super::service::AzureLogsIngestionRequest; + +pub struct AzureLogsIngestionSink { + batch_settings: BatcherSettings, + encoding: JsonEncoding, + service: S, + protocol: String, +} + +impl AzureLogsIngestionSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: Debug + Into + Send, +{ + pub fn new( + batch_settings: BatcherSettings, + transformer: Transformer, + service: S, + protocol: String, + ) -> Self { + Self { + batch_settings, + encoding: JsonEncoding::new(transformer), + service, + protocol, + } + } + + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + input + .batched(self.batch_settings.as_byte_size_config()) + .request_builder( + default_request_builder_concurrency_limit(), + AzureLogsIngestionRequestBuilder { + encoding: self.encoding, + }, + ) + .filter_map(|request| async { + match request { + Err(error) => { + emit!(SinkRequestBuildError { error }); + None + } + Ok(req) => Some(req), + } + }) + .into_driver(self.service) + .protocol(self.protocol.clone()) + .run() + .await + } +} + +#[async_trait::async_trait] +impl StreamSink for AzureLogsIngestionSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: Debug + Into + Send, +{ + async fn run( + self: Box, + input: futures_util::stream::BoxStream<'_, Event>, + ) -> Result<(), ()> { + self.run_inner(input).await + } +} + +/// Customized encoding specific to the Azure Logs Ingestion sink, as the API does not support full +/// 9-digit nanosecond precision timestamps. +#[derive(Clone, Debug)] +pub(super) struct JsonEncoding { + encoder: (Transformer, Encoder), +} + +impl JsonEncoding { + pub fn new(transformer: Transformer) -> Self { + Self { + encoder: ( + transformer, + Encoder::::new( + CharacterDelimitedEncoder::new(b',').into(), + JsonSerializerConfig::default().build().into(), + ), + ), + } + } +} + +impl crate::sinks::util::encoding::Encoder> for JsonEncoding { + fn encode_input( + &self, + input: Vec, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)> { + self.encoder.encode_input(input, writer) + } +} + +struct AzureLogsIngestionRequestBuilder { + encoding: JsonEncoding, +} + +impl RequestBuilder> for AzureLogsIngestionRequestBuilder { + type Metadata = EventFinalizers; + type Events = Vec; + type Encoder = JsonEncoding; + type Payload = Bytes; + type Request = AzureLogsIngestionRequest; + type Error = std::io::Error; + + fn compression(&self) -> Compression { + Compression::None + } + + fn encoder(&self) -> &Self::Encoder { + &self.encoding + } + + fn split_input( + &self, + mut events: Vec, + ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { + let finalizers = events.take_finalizers(); + let builder = RequestMetadataBuilder::from_events(&events); + (finalizers, builder, events) + } + + fn build_request( + &self, + finalizers: Self::Metadata, + request_metadata: RequestMetadata, + payload: EncodeResult, + ) -> Self::Request { + AzureLogsIngestionRequest { + body: payload.into_payload(), + finalizers, + metadata: request_metadata, + } + } +} diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index b5a45a462566e..5c84a96ccd7f4 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -28,6 +28,8 @@ pub mod axiom; pub mod azure_blob; #[cfg(feature = "sinks-azure_blob")] pub mod azure_common; +#[cfg(feature = "sinks-azure_logs_ingestion")] +pub mod azure_logs_ingestion; #[cfg(feature = "sinks-azure_monitor_logs")] pub mod azure_monitor_logs; #[cfg(feature = "sinks-blackhole")] From 0ca7909d365adc99c5b1d51b1fed278c0e9c9755 Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Sat, 2 Nov 2024 23:16:15 +0000 Subject: [PATCH 02/40] tidy up error messages --- src/sinks/azure_logs_ingestion/service.rs | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/sinks/azure_logs_ingestion/service.rs b/src/sinks/azure_logs_ingestion/service.rs index 9a7d99ecad6d8..ad01bbb05af1f 100644 --- a/src/sinks/azure_logs_ingestion/service.rs +++ b/src/sinks/azure_logs_ingestion/service.rs @@ -137,21 +137,26 @@ impl AzureLogsIngestionService { let res = client.call(request).in_current_span().await?; if res.status().is_server_error() { - return Err("Server returned a server error".into()); + return Err("Azure returned a server error".into()); + } + + if res.status() == StatusCode::UNAUTHORIZED { + return Err("Azure returned 401 Unauthorised. Check that the token_scope matches the soverign cloud endpoint.".into()); } if res.status() == StatusCode::FORBIDDEN { - return Err("The service failed to authenticate the request. Verify that the workspace ID and connection key are valid".into()); + return Err("Azure returned 403 Forbidden. Verify that the credential has the Monitoring Metrics Publisher role on the Data Collection Rule.".into()); } if res.status() == StatusCode::NOT_FOUND { - return Err( - "Either the URL provided is incorrect, or the request is too large".into(), - ); + return Err("Azure returned 404 Not Found. Either the URL provided is incorrect, or the request is too large".into()); } if res.status() == StatusCode::BAD_REQUEST { - return Err("The workspace has been closed or the request was invalid".into()); + let body_bytes: Bytes = hyper::body::to_bytes(res.into_body()).await.unwrap(); + let body_string: String = String::from_utf8(body_bytes.to_vec()).unwrap(); + let err_string: String = format!("Azure returned 400 Bad Request: {body_string}"); + return Err(err_string.into()); } Ok(()) @@ -176,12 +181,8 @@ impl Service for AzureLogsIngestionService { Box::pin(async move { let http_request = http_request?; let response = client.call(http_request).in_current_span().await?; - let response_status = response.status(); - // let body_bytes: Bytes = hyper::body::to_bytes(response.into_body()).await.unwrap(); - // let body_string: String = String::from_utf8(body_bytes.to_vec()).unwrap(); - // println!("response: {}", body_string); Ok(AzureLogsIngestionResponse { - http_status: response_status, + http_status: response.status(), raw_byte_size: request.metadata.request_encoded_size(), events_byte_size: request .metadata From 9c075211cc66ef343b518d7c1401a65e48730295 Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Sat, 2 Nov 2024 23:18:16 +0000 Subject: [PATCH 03/40] make token_scope configurable Signed-off-by: Jed Laundry --- src/sinks/azure_logs_ingestion/config.rs | 16 ++++++++++++++-- src/sinks/azure_logs_ingestion/service.rs | 5 ++++- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/src/sinks/azure_logs_ingestion/config.rs b/src/sinks/azure_logs_ingestion/config.rs index 5c1249363fa41..5d5a0ef9c8ead 100644 --- a/src/sinks/azure_logs_ingestion/config.rs +++ b/src/sinks/azure_logs_ingestion/config.rs @@ -22,8 +22,9 @@ use super::{ /// Max number of bytes in request body const MAX_BATCH_SIZE: usize = 30 * 1024 * 1024; -// Log Ingestion API version -// const API_VERSION: &str = "2023-01-01"; +pub(super) fn default_scope() -> String { + "https://monitor.azure.com/.default".into() +} /// Configuration for the `azure_logs_ingestion` sink. #[configurable_component(sink( @@ -51,6 +52,14 @@ pub struct AzureLogsIngestionConfig { #[configurable(metadata(docs::examples = "Custom-MyTable"))] pub stream_name: String, + /// [Token scope][token_scope] for dedicated Azure regions. + /// + /// [token_scope]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview + #[configurable(metadata(docs::examples = "https://monitor.azure.us/.default"))] + #[configurable(metadata(docs::examples = "https://monitor.azure.cn/.default"))] + #[serde(default = "default_scope")] + pub(super) token_scope: String, + #[configurable(derived)] #[serde(default, skip_serializing_if = "crate::serde::is_default")] pub encoding: Transformer, @@ -81,6 +90,7 @@ impl Default for AzureLogsIngestionConfig { endpoint: Default::default(), dcr: Default::default(), stream_name: Default::default(), + token_scope: default_scope(), encoding: Default::default(), batch: Default::default(), request: Default::default(), @@ -96,6 +106,7 @@ impl AzureLogsIngestionConfig { &self, cx: SinkContext, endpoint: UriSerde, + token_scope: String, ) -> crate::Result<(VectorSink, Healthcheck)> { let endpoint = endpoint.with_default_parts().uri; let protocol = get_http_scheme_from_uri(&endpoint).to_string(); @@ -117,6 +128,7 @@ impl AzureLogsIngestionConfig { client, endpoint, credential, + token_scope, )?; let healthcheck = service.healthcheck(); diff --git a/src/sinks/azure_logs_ingestion/service.rs b/src/sinks/azure_logs_ingestion/service.rs index ad01bbb05af1f..a4ff46c55554f 100644 --- a/src/sinks/azure_logs_ingestion/service.rs +++ b/src/sinks/azure_logs_ingestion/service.rs @@ -75,6 +75,7 @@ pub struct AzureLogsIngestionService { client: HttpClient, endpoint: Uri, credential: Arc, + token_scope: String, default_headers: HeaderMap, } @@ -84,6 +85,7 @@ impl AzureLogsIngestionService { client: HttpClient, endpoint: Uri, credential: Arc, + token_scope: String, ) -> crate::Result { // let mut parts = endpoint.into_parts(); // parts.path_and_query = Some( @@ -104,6 +106,7 @@ impl AzureLogsIngestionService { client, endpoint, credential, + token_scope, default_headers, }) } @@ -113,7 +116,7 @@ impl AzureLogsIngestionService { // TODO: make this an option, for soverign clouds let access_token = executor::block_on(self.credential - .get_token(&["https://monitor.azure.com/.default"])) + .get_token(&[&self.token_scope])) .expect("failed to get access token from credential"); let bearer = format!("Bearer {}", access_token.token.secret()); From e3ed26db6d83524697d86dfc08035a025c593aed Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Sat, 2 Nov 2024 23:22:14 +0000 Subject: [PATCH 04/40] parameterise the dcr_immutable_id and stream_name Signed-off-by: Jed Laundry --- src/sinks/azure_logs_ingestion/config.rs | 25 +++++++++++++++-------- src/sinks/azure_logs_ingestion/service.rs | 22 ++++++++++++-------- 2 files changed, 31 insertions(+), 16 deletions(-) diff --git a/src/sinks/azure_logs_ingestion/config.rs b/src/sinks/azure_logs_ingestion/config.rs index 5d5a0ef9c8ead..711ceae983fd2 100644 --- a/src/sinks/azure_logs_ingestion/config.rs +++ b/src/sinks/azure_logs_ingestion/config.rs @@ -40,11 +40,11 @@ pub struct AzureLogsIngestionConfig { #[configurable(metadata(docs::examples = "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com"))] pub endpoint: String, - /// The [Data collection rule][dcr] for the Data collection endpoint. + /// The [Data collection rule immutable ID][dcr_immutable_id] for the Data collection endpoint. /// - /// [dcr]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview + /// [dcr_immutable_id]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview #[configurable(metadata(docs::examples = "dcr-000a00a000a00000a000000aa000a0aa"))] - pub dcr: String, + pub dcr_immutable_id: String, /// The [Stream name][stream_name] for the Data collection rule. /// @@ -88,7 +88,7 @@ impl Default for AzureLogsIngestionConfig { fn default() -> Self { Self { endpoint: Default::default(), - dcr: Default::default(), + dcr_immutable_id: Default::default(), stream_name: Default::default(), token_scope: default_scope(), encoding: Default::default(), @@ -106,6 +106,8 @@ impl AzureLogsIngestionConfig { &self, cx: SinkContext, endpoint: UriSerde, + dcr_immutable_id: String, + stream_name: String, token_scope: String, ) -> crate::Result<(VectorSink, Healthcheck)> { let endpoint = endpoint.with_default_parts().uri; @@ -127,6 +129,8 @@ impl AzureLogsIngestionConfig { let service = AzureLogsIngestionService::new( client, endpoint, + dcr_immutable_id, + stream_name, credential, token_scope, )?; @@ -156,10 +160,15 @@ impl_generate_config_from_default!(AzureLogsIngestionConfig); #[typetag::serde(name = "azure_logs_ingestion")] impl SinkConfig for AzureLogsIngestionConfig { async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { - // https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com/dataCollectionRules/dcr-000a00a000a00000a000000aa000a0aa/streams/Custom-MyTable?api-version=2023-01-01 - // let endpoint = format!("{}/dataCollectionRules/{}/streams/{}", self.endpoint, self.dcr, self.stream_name).parse()?; - let endpoint = self.endpoint.parse()?; - self.build_inner(cx, endpoint).await + + let endpoint: UriSerde = self.endpoint.parse()?; + self.build_inner( + cx, + endpoint, + self.dcr_immutable_id.clone(), + self.stream_name.clone(), + self.token_scope.clone(), + ).await } fn input(&self) -> Input { diff --git a/src/sinks/azure_logs_ingestion/service.rs b/src/sinks/azure_logs_ingestion/service.rs index a4ff46c55554f..757ccc5a70ab7 100644 --- a/src/sinks/azure_logs_ingestion/service.rs +++ b/src/sinks/azure_logs_ingestion/service.rs @@ -15,7 +15,10 @@ use tracing::Instrument; use crate::{http::HttpClient, sinks::prelude::*}; -// JSON content type of logs +/// Log Ingestion API version +const API_VERSION: &str = "2023-01-01"; + +/// JSON content type of logs const CONTENT_TYPE: &str = "application/json"; static CONTENT_TYPE_VALUE: LazyLock = @@ -84,16 +87,19 @@ impl AzureLogsIngestionService { pub fn new( client: HttpClient, endpoint: Uri, + dcr_immutable_id: String, + stream_name: String, credential: Arc, token_scope: String, ) -> crate::Result { - // let mut parts = endpoint.into_parts(); - // parts.path_and_query = Some( - // format!("a9ee8e5b-ed0e-4980-9b9c-15e1f939db7f?api-version={API_VERSION}") - // .parse() - // .expect("query should never fail to parse"), - // ); - // let endpoint = Uri::from_parts(parts)?; + let mut parts = endpoint.into_parts(); + parts.path_and_query = Some( + // https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com/dataCollectionRules/dcr-000a00a000a00000a000000aa000a0aa/streams/Custom-MyTable?api-version=2023-01-01 + format!("/dataCollectionRules/{dcr_immutable_id}/streams/{stream_name}?api-version={API_VERSION}") + .parse() + .expect("path and query should never fail to parse"), + ); + let endpoint = Uri::from_parts(parts)?; let default_headers = { let mut headers = HeaderMap::new(); From 5a1748b8d8bbada958dc7fe21782a8691af23723 Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Sat, 2 Nov 2024 23:30:57 +0000 Subject: [PATCH 05/40] spelling mistake --- src/sinks/azure_logs_ingestion/service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sinks/azure_logs_ingestion/service.rs b/src/sinks/azure_logs_ingestion/service.rs index 757ccc5a70ab7..7c593c4562927 100644 --- a/src/sinks/azure_logs_ingestion/service.rs +++ b/src/sinks/azure_logs_ingestion/service.rs @@ -150,7 +150,7 @@ impl AzureLogsIngestionService { } if res.status() == StatusCode::UNAUTHORIZED { - return Err("Azure returned 401 Unauthorised. Check that the token_scope matches the soverign cloud endpoint.".into()); + return Err("Azure returned 401 Unauthorised. Check that the token_scope matches the sovereign cloud endpoint.".into()); } if res.status() == StatusCode::FORBIDDEN { From 1759ca7a9acbcfa38d18cce8bd44e9084ca774a8 Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Fri, 17 Jan 2025 22:06:03 +1300 Subject: [PATCH 06/40] add reminder from upstream changes --- src/sinks/azure_logs_ingestion/config.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/sinks/azure_logs_ingestion/config.rs b/src/sinks/azure_logs_ingestion/config.rs index 711ceae983fd2..0b2a8f3aa1590 100644 --- a/src/sinks/azure_logs_ingestion/config.rs +++ b/src/sinks/azure_logs_ingestion/config.rs @@ -123,6 +123,8 @@ impl AzureLogsIngestionConfig { // https://github.com/Azure/azure-sdk-for-rust/blob/main/sdk/identity/azure_identity/CHANGELOG.md let credential: Arc = azure_identity::create_credential()?; + // TODO this needs to change with toolchain 1.83.0, as per commit ca084cc (#22068) + //let tls_settings = TlsSettings::from_options(self.tls.as_ref())?; let tls_settings = TlsSettings::from_options(&self.tls)?; let client = HttpClient::new(Some(tls_settings), &cx.proxy)?; From fb0699e6a4fbbbc83d9e492d9821d06422c16bee Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Sat, 2 Nov 2024 03:49:11 +0000 Subject: [PATCH 07/40] update azure crates to 0.21 Signed-off-by: Jed Laundry --- Cargo.lock | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index ff9e8907b9c2f..4a541b971649e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -880,6 +880,25 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "async-process" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63255f1dc2381611000436537bbedfe83183faa303a5a0edaf191edef06526bb" +dependencies = [ + "async-channel 2.3.1", + "async-io 2.1.0", + "async-lock 3.4.0", + "async-signal", + "async-task", + "blocking", + "cfg-if", + "event-listener 5.3.1", + "futures-lite 2.4.0", + "rustix 0.38.37", + "tracing 0.1.40", +] + [[package]] name = "async-reactor-trait" version = "1.1.0" @@ -1736,6 +1755,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "82c33c072c9d87777262f35abfe2a64b609437076551d4dac8373e60f0e3fde9" dependencies = [ "async-lock 3.4.0", + "async-process 2.3.0", "async-trait", "bytes 1.10.1", "futures 0.3.31", @@ -10458,7 +10478,7 @@ dependencies = [ "async-io 1.13.0", "async-lock 2.8.0", "async-net", - "async-process", + "async-process 1.8.1", "blocking", "futures-lite 1.13.0", ] From 46a467ebd3238ebbd540c2d11a2ca85acdec237d Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Sun, 20 Apr 2025 20:46:10 +0000 Subject: [PATCH 08/40] update for 1.83 toolchain Signed-off-by: Jed Laundry --- src/sinks/azure_logs_ingestion/config.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/sinks/azure_logs_ingestion/config.rs b/src/sinks/azure_logs_ingestion/config.rs index 0b2a8f3aa1590..86c478a494424 100644 --- a/src/sinks/azure_logs_ingestion/config.rs +++ b/src/sinks/azure_logs_ingestion/config.rs @@ -119,13 +119,9 @@ impl AzureLogsIngestionConfig { .limit_max_bytes(MAX_BATCH_SIZE)? .into_batcher_settings()?; - // TODO will need to change this as part of upstream 0.20.0 - // https://github.com/Azure/azure-sdk-for-rust/blob/main/sdk/identity/azure_identity/CHANGELOG.md let credential: Arc = azure_identity::create_credential()?; - // TODO this needs to change with toolchain 1.83.0, as per commit ca084cc (#22068) - //let tls_settings = TlsSettings::from_options(self.tls.as_ref())?; - let tls_settings = TlsSettings::from_options(&self.tls)?; + let tls_settings = TlsSettings::from_options(self.tls.as_ref())?; let client = HttpClient::new(Some(tls_settings), &cx.proxy)?; let service = AzureLogsIngestionService::new( From f211a0934c6c1d898c2fa9cdf8289cd178366946 Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Sun, 20 Apr 2025 22:28:19 +0000 Subject: [PATCH 09/40] Add configurable timestamp field Signed-off-by: Jed Laundry --- src/sinks/azure_logs_ingestion/config.rs | 19 ++++++++++++++ src/sinks/azure_logs_ingestion/sink.rs | 32 ++++++++++++++++++++---- 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/src/sinks/azure_logs_ingestion/config.rs b/src/sinks/azure_logs_ingestion/config.rs index 86c478a494424..95a6f88bb1083 100644 --- a/src/sinks/azure_logs_ingestion/config.rs +++ b/src/sinks/azure_logs_ingestion/config.rs @@ -26,6 +26,10 @@ pub(super) fn default_scope() -> String { "https://monitor.azure.com/.default".into() } +pub(super) fn default_timestamp_field() -> String { + "TimeGenerated".into() +} + /// Configuration for the `azure_logs_ingestion` sink. #[configurable_component(sink( "azure_logs_ingestion", @@ -60,6 +64,17 @@ pub struct AzureLogsIngestionConfig { #[serde(default = "default_scope")] pub(super) token_scope: String, + /// The destination field (column) for the timestamp. + /// + /// The setting of `log_schema.timestamp_key`, usually `timestamp`, is used as the source. + /// Most schemas use `TimeGenerated`, but some use `Timestamp` (legacy) or `EventStartTime` (ASIM) [std_columns]. + /// + /// [std_columns]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/log-standard-columns#timegenerated + #[configurable(metadata(docs::examples = "EventStartTime"))] + #[configurable(metadata(docs::examples = "Timestamp"))] + #[serde(default = "default_timestamp_field")] + pub timestamp_field: String, + #[configurable(derived)] #[serde(default, skip_serializing_if = "crate::serde::is_default")] pub encoding: Transformer, @@ -91,6 +106,7 @@ impl Default for AzureLogsIngestionConfig { dcr_immutable_id: Default::default(), stream_name: Default::default(), token_scope: default_scope(), + timestamp_field: default_timestamp_field(), encoding: Default::default(), batch: Default::default(), request: Default::default(), @@ -109,6 +125,7 @@ impl AzureLogsIngestionConfig { dcr_immutable_id: String, stream_name: String, token_scope: String, + timestamp_field: String, ) -> crate::Result<(VectorSink, Healthcheck)> { let endpoint = endpoint.with_default_parts().uri; let protocol = get_http_scheme_from_uri(&endpoint).to_string(); @@ -145,6 +162,7 @@ impl AzureLogsIngestionConfig { batch_settings, self.encoding.clone(), service, + timestamp_field, protocol, ); @@ -166,6 +184,7 @@ impl SinkConfig for AzureLogsIngestionConfig { self.dcr_immutable_id.clone(), self.stream_name.clone(), self.token_scope.clone(), + self.timestamp_field.clone(), ).await } diff --git a/src/sinks/azure_logs_ingestion/sink.rs b/src/sinks/azure_logs_ingestion/sink.rs index e86e1d27b67a1..6168ef51683a7 100644 --- a/src/sinks/azure_logs_ingestion/sink.rs +++ b/src/sinks/azure_logs_ingestion/sink.rs @@ -2,6 +2,7 @@ use std::{fmt::Debug, io}; use bytes::Bytes; use vector_lib::codecs::{encoding::Framer, CharacterDelimitedEncoder, JsonSerializerConfig}; +use vector_lib::lookup::PathPrefix; use crate::sinks::prelude::*; @@ -25,11 +26,12 @@ where batch_settings: BatcherSettings, transformer: Transformer, service: S, + timestamp_field: String, protocol: String, ) -> Self { Self { batch_settings, - encoding: JsonEncoding::new(transformer), + encoding: JsonEncoding::new(transformer, timestamp_field), service, protocol, } @@ -76,16 +78,17 @@ where } } -/// Customized encoding specific to the Azure Logs Ingestion sink, as the API does not support full -/// 9-digit nanosecond precision timestamps. +/// Customized encoding specific to the Azure Logs Ingestion sink. #[derive(Clone, Debug)] pub(super) struct JsonEncoding { + timestamp_field: String, encoder: (Transformer, Encoder), } impl JsonEncoding { - pub fn new(transformer: Transformer) -> Self { + pub fn new(transformer: Transformer, timestamp_field: String) -> Self { Self { + timestamp_field, encoder: ( transformer, Encoder::::new( @@ -100,9 +103,28 @@ impl JsonEncoding { impl crate::sinks::util::encoding::Encoder> for JsonEncoding { fn encode_input( &self, - input: Vec, + mut input: Vec, writer: &mut dyn io::Write, ) -> io::Result<(usize, GroupedCountByteSize)> { + for event in input.iter_mut() { + let log = event.as_mut_log(); + + // `.remove_timestamp()` will return the `timestamp` value regardless of location in Event or + // Metadata, the following `insert()` ensures it's encoded in the request. + let timestamp = if let Some(Value::Timestamp(ts)) = log.remove_timestamp() { + ts + } else { + chrono::Utc::now() + }; + + log.insert( + (PathPrefix::Event, self.timestamp_field.as_str()), + serde_json::Value::String( + timestamp.to_rfc3339_opts(chrono::SecondsFormat::Micros, true), + ), + ); + } + self.encoder.encode_input(input, writer) } } From ab2014ed9b892ce6ec26371a2c58a2af7e098a6c Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Sun, 20 Apr 2025 22:40:31 +0000 Subject: [PATCH 10/40] add docs Signed-off-by: Jed Laundry --- src/sinks/azure_logs_ingestion/config.rs | 2 +- src/sinks/azure_monitor_logs/config.rs | 2 +- .../sinks/generated/azure_logs_ingestion.cue | 424 ++++++++++++++++++ 3 files changed, 426 insertions(+), 2 deletions(-) create mode 100644 website/cue/reference/components/sinks/generated/azure_logs_ingestion.cue diff --git a/src/sinks/azure_logs_ingestion/config.rs b/src/sinks/azure_logs_ingestion/config.rs index 95a6f88bb1083..f602838e1acac 100644 --- a/src/sinks/azure_logs_ingestion/config.rs +++ b/src/sinks/azure_logs_ingestion/config.rs @@ -33,7 +33,7 @@ pub(super) fn default_timestamp_field() -> String { /// Configuration for the `azure_logs_ingestion` sink. #[configurable_component(sink( "azure_logs_ingestion", - "Publish log events to the Azure Logs Ingestion API." + "Publish log events to the Azure Monitor Logs Ingestion API." ))] #[derive(Clone, Debug)] #[serde(deny_unknown_fields)] diff --git a/src/sinks/azure_monitor_logs/config.rs b/src/sinks/azure_monitor_logs/config.rs index 0b923894f1b3e..dc60d179a2ff5 100644 --- a/src/sinks/azure_monitor_logs/config.rs +++ b/src/sinks/azure_monitor_logs/config.rs @@ -30,7 +30,7 @@ pub(super) fn default_host() -> String { /// Configuration for the `azure_monitor_logs` sink. #[configurable_component(sink( "azure_monitor_logs", - "Publish log events to the Azure Monitor Logs service." + "Publish log events to the Azure Monitor Data Collector API." ))] #[derive(Clone, Debug)] #[serde(deny_unknown_fields)] diff --git a/website/cue/reference/components/sinks/generated/azure_logs_ingestion.cue b/website/cue/reference/components/sinks/generated/azure_logs_ingestion.cue new file mode 100644 index 0000000000000..3251ef421da3c --- /dev/null +++ b/website/cue/reference/components/sinks/generated/azure_logs_ingestion.cue @@ -0,0 +1,424 @@ +package metadata + +base: components: sinks: azure_logs_ingestion: configuration: { + acknowledgements: { + description: """ + Controls how acknowledgements are handled for this sink. + + See [End-to-end Acknowledgements][e2e_acks] for more information on how event acknowledgement is handled. + + [e2e_acks]: https://vector.dev/docs/about/under-the-hood/architecture/end-to-end-acknowledgements/ + """ + required: false + type: object: options: enabled: { + description: """ + Whether or not end-to-end acknowledgements are enabled. + + When enabled for a sink, any source that supports end-to-end + acknowledgements that is connected to that sink waits for events + to be acknowledged by **all connected sinks** before acknowledging them at the source. + + Enabling or disabling acknowledgements at the sink level takes precedence over any global + [`acknowledgements`][global_acks] configuration. + + [global_acks]: https://vector.dev/docs/reference/configuration/global-options/#acknowledgements + """ + required: false + type: bool: {} + } + } + batch: { + description: "Event batching behavior." + required: false + type: object: options: { + max_bytes: { + description: """ + The maximum size of a batch that is processed by a sink. + + This is based on the uncompressed size of the batched events, before they are + serialized/compressed. + """ + required: false + type: uint: { + default: 10000000 + unit: "bytes" + } + } + max_events: { + description: "The maximum size of a batch before it is flushed." + required: false + type: uint: unit: "events" + } + timeout_secs: { + description: "The maximum age of a batch before it is flushed." + required: false + type: float: { + default: 1.0 + unit: "seconds" + } + } + } + } + dcr_immutable_id: { + description: """ + The [Data collection rule immutable ID][dcr_immutable_id] for the Data collection endpoint. + + [dcr_immutable_id]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview + """ + required: true + type: string: examples: ["dcr-000a00a000a00000a000000aa000a0aa"] + } + encoding: { + description: "Transformations to prepare an event for serialization." + required: false + type: object: options: { + except_fields: { + description: "List of fields that are excluded from the encoded event." + required: false + type: array: items: type: string: {} + } + only_fields: { + description: "List of fields that are included in the encoded event." + required: false + type: array: items: type: string: {} + } + timestamp_format: { + description: "Format used for timestamp fields." + required: false + type: string: enum: { + rfc3339: "Represent the timestamp as a RFC 3339 timestamp." + unix: "Represent the timestamp as a Unix timestamp." + unix_float: "Represent the timestamp as a Unix timestamp in floating point." + unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds." + unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds." + unix_us: "Represent the timestamp as a Unix timestamp in microseconds" + } + } + } + } + endpoint: { + description: """ + The [Data collection endpoint URI][endpoint] associated with the Log Analytics workspace. + + [endpoint]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview + """ + required: true + type: string: examples: ["https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com"] + } + request: { + description: """ + Middleware settings for outbound requests. + + Various settings can be configured, such as concurrency and rate limits, timeouts, and retry behavior. + + Note that the retry backoff policy follows the Fibonacci sequence. + """ + required: false + type: object: options: { + adaptive_concurrency: { + description: """ + Configuration of adaptive concurrency parameters. + + These parameters typically do not require changes from the default, and incorrect values can lead to meta-stable or + unstable performance and sink behavior. Proceed with caution. + """ + required: false + type: object: options: { + decrease_ratio: { + description: """ + The fraction of the current value to set the new concurrency limit when decreasing the limit. + + Valid values are greater than `0` and less than `1`. Smaller values cause the algorithm to scale back rapidly + when latency increases. + + **Note**: The new limit is rounded down after applying this ratio. + """ + required: false + type: float: default: 0.9 + } + ewma_alpha: { + description: """ + The weighting of new measurements compared to older measurements. + + Valid values are greater than `0` and less than `1`. + + ARC uses an exponentially weighted moving average (EWMA) of past RTT measurements as a reference to compare with + the current RTT. Smaller values cause this reference to adjust more slowly, which may be useful if a service has + unusually high response variability. + """ + required: false + type: float: default: 0.4 + } + initial_concurrency: { + description: """ + The initial concurrency limit to use. If not specified, the initial limit is 1 (no concurrency). + + Datadog recommends setting this value to your service's average limit if you're seeing that it takes a + long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the + `adaptive_concurrency_limit` metric. + """ + required: false + type: uint: default: 1 + } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit does not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } + rtt_deviation_scale: { + description: """ + Scale of RTT deviations which are not considered anomalous. + + Valid values are greater than or equal to `0`, and we expect reasonable values to range from `1.0` to `3.0`. + + When calculating the past RTT average, we also compute a secondary “deviation” value that indicates how variable + those values are. We use that deviation when comparing the past RTT average to the current measurements, so we + can ignore increases in RTT that are within an expected range. This factor is used to scale up the deviation to + an appropriate range. Larger values cause the algorithm to ignore larger increases in the RTT. + """ + required: false + type: float: default: 2.5 + } + } + } + concurrency: { + description: """ + Configuration for outbound request concurrency. + + This can be set either to one of the below enum values or to a positive integer, which denotes + a fixed concurrency limit. + """ + required: false + type: { + string: { + default: "adaptive" + enum: { + adaptive: """ + Concurrency is managed by Vector's [Adaptive Request Concurrency][arc] feature. + + [arc]: https://vector.dev/docs/about/under-the-hood/networking/arc/ + """ + none: """ + A fixed concurrency of 1. + + Only one request can be outstanding at any given time. + """ + } + } + uint: {} + } + } + rate_limit_duration_secs: { + description: "The time window used for the `rate_limit_num` option." + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + rate_limit_num: { + description: "The maximum number of requests allowed within the `rate_limit_duration_secs` time window." + required: false + type: uint: { + default: 9223372036854775807 + unit: "requests" + } + } + retry_attempts: { + description: "The maximum number of retries to make for failed requests." + required: false + type: uint: { + default: 9223372036854775807 + unit: "retries" + } + } + retry_initial_backoff_secs: { + description: """ + The amount of time to wait before attempting the first retry for a failed request. + + After the first retry has failed, the fibonacci sequence is used to select future backoffs. + """ + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + retry_jitter_mode: { + description: "The jitter mode to use for retry backoff behavior." + required: false + type: string: { + default: "Full" + enum: { + Full: """ + Full jitter. + + The random delay is anywhere from 0 up to the maximum current delay calculated by the backoff + strategy. + + Incorporating full jitter into your backoff strategy can greatly reduce the likelihood + of creating accidental denial of service (DoS) conditions against your own systems when + many clients are recovering from a failure state. + """ + None: "No jitter." + } + } + } + retry_max_duration_secs: { + description: "The maximum amount of time to wait between retries." + required: false + type: uint: { + default: 30 + unit: "seconds" + } + } + timeout_secs: { + description: """ + The time a request can take before being aborted. + + Datadog highly recommends that you do not lower this value below the service's internal timeout, as this could + create orphaned requests, pile on retries, and result in duplicate data downstream. + """ + required: false + type: uint: { + default: 60 + unit: "seconds" + } + } + } + } + stream_name: { + description: """ + The [Stream name][stream_name] for the Data collection rule. + + [stream_name]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview + """ + required: true + type: string: examples: ["Custom-MyTable"] + } + timestamp_field: { + description: """ + The destination field (column) for the timestamp. + + The setting of `log_schema.timestamp_key`, usually `timestamp`, is used as the source. + Most schemas use `TimeGenerated`, but some use `Timestamp` (legacy) or `EventStartTime` (ASIM) [std_columns]. + + [std_columns]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/log-standard-columns#timegenerated + """ + required: false + type: string: { + default: "TimeGenerated" + examples: ["EventStartTime", "Timestamp"] + } + } + tls: { + description: "TLS configuration." + required: false + type: object: options: { + alpn_protocols: { + description: """ + Sets the list of supported ALPN protocols. + + Declare the supported ALPN protocols, which are used during negotiation with a peer. They are prioritized in the order + that they are defined. + """ + required: false + type: array: items: type: string: examples: ["h2"] + } + ca_file: { + description: """ + Absolute path to an additional CA certificate file. + + The certificate must be in the DER or PEM (X.509) format. Additionally, the certificate can be provided as an inline string in PEM format. + """ + required: false + type: string: examples: ["/path/to/certificate_authority.crt"] + } + crt_file: { + description: """ + Absolute path to a certificate file used to identify this server. + + The certificate must be in DER, PEM (X.509), or PKCS#12 format. Additionally, the certificate can be provided as + an inline string in PEM format. + + If this is set _and_ is not a PKCS#12 archive, `key_file` must also be set. + """ + required: false + type: string: examples: ["/path/to/host_certificate.crt"] + } + key_file: { + description: """ + Absolute path to a private key file used to identify this server. + + The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format. + """ + required: false + type: string: examples: ["/path/to/host_certificate.key"] + } + key_pass: { + description: """ + Passphrase used to unlock the encrypted key file. + + This has no effect unless `key_file` is set. + """ + required: false + type: string: examples: ["${KEY_PASS_ENV_VAR}", "PassWord1"] + } + server_name: { + description: """ + Server name to use when using Server Name Indication (SNI). + + Only relevant for outgoing connections. + """ + required: false + type: string: examples: ["www.example.com"] + } + verify_certificate: { + description: """ + Enables certificate verification. For components that create a server, this requires that the + client connections have a valid client certificate. For components that initiate requests, + this validates that the upstream has a valid certificate. + + If enabled, certificates must not be expired and must be issued by a trusted + issuer. This verification operates in a hierarchical manner, checking that the leaf certificate (the + certificate presented by the client/server) is not only valid, but that the issuer of that certificate is also valid, and + so on, until the verification process reaches a root certificate. + + Do NOT set this to `false` unless you understand the risks of not verifying the validity of certificates. + """ + required: false + type: bool: {} + } + verify_hostname: { + description: """ + Enables hostname verification. + + If enabled, the hostname used to connect to the remote host must be present in the TLS certificate presented by + the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension. + + Only relevant for outgoing connections. + + Do NOT set this to `false` unless you understand the risks of not verifying the remote hostname. + """ + required: false + type: bool: {} + } + } + } + token_scope: { + description: """ + [Token scope][token_scope] for dedicated Azure regions. + + [token_scope]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview + """ + required: false + type: string: { + default: "https://monitor.azure.com/.default" + examples: ["https://monitor.azure.us/.default", "https://monitor.azure.cn/.default"] + } + } +} From 4c356a15b03edc2fcf7eb8520080053b51c14186 Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Sun, 20 Apr 2025 23:17:39 +0000 Subject: [PATCH 11/40] add changelog fragment Signed-off-by: Jed Laundry --- changelog.d/22912_add_azure_logs_ingestion.feature.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changelog.d/22912_add_azure_logs_ingestion.feature.md diff --git a/changelog.d/22912_add_azure_logs_ingestion.feature.md b/changelog.d/22912_add_azure_logs_ingestion.feature.md new file mode 100644 index 0000000000000..0cc46d5e48097 --- /dev/null +++ b/changelog.d/22912_add_azure_logs_ingestion.feature.md @@ -0,0 +1,3 @@ +Add support for the Azure Monitor Logs Ingestion API through a new `azure_logs_ingestion` sink. + +authors: jlaundry From 8aab329e498da7b64ca6622c99b1605f1246c00a Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Sun, 20 Apr 2025 23:17:32 +0000 Subject: [PATCH 12/40] Update metadata check-spelling run (pull_request_target) for feature-azure_logs_ingestion Signed-off-by: check-spelling-bot on-behalf-of: @check-spelling --- .github/actions/spelling/expect.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/actions/spelling/expect.txt b/.github/actions/spelling/expect.txt index 3f07a7f991a9a..93327ee306768 100644 --- a/.github/actions/spelling/expect.txt +++ b/.github/actions/spelling/expect.txt @@ -16,6 +16,7 @@ appname aqf architecting archs +ASIM assertverify atag aton @@ -129,6 +130,7 @@ datasources datid datname DBserver +dcr ddagent ddev ddmetrics From 9b03078ebd8cecb3f64a28962dcc0cc3635e90dc Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Tue, 22 Apr 2025 19:48:16 +0000 Subject: [PATCH 13/40] check-fmt lint --- src/sinks/azure_logs_ingestion/config.rs | 10 +++++----- src/sinks/azure_logs_ingestion/service.rs | 19 ++++++++----------- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/src/sinks/azure_logs_ingestion/config.rs b/src/sinks/azure_logs_ingestion/config.rs index f602838e1acac..77a0baea1e7b5 100644 --- a/src/sinks/azure_logs_ingestion/config.rs +++ b/src/sinks/azure_logs_ingestion/config.rs @@ -1,4 +1,3 @@ - use std::sync::Arc; use azure_core::auth::TokenCredential; @@ -41,7 +40,9 @@ pub struct AzureLogsIngestionConfig { /// The [Data collection endpoint URI][endpoint] associated with the Log Analytics workspace. /// /// [endpoint]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview - #[configurable(metadata(docs::examples = "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com"))] + #[configurable(metadata( + docs::examples = "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com" + ))] pub endpoint: String, /// The [Data collection rule immutable ID][dcr_immutable_id] for the Data collection endpoint. @@ -117,7 +118,6 @@ impl Default for AzureLogsIngestionConfig { } impl AzureLogsIngestionConfig { - pub(super) async fn build_inner( &self, cx: SinkContext, @@ -176,7 +176,6 @@ impl_generate_config_from_default!(AzureLogsIngestionConfig); #[typetag::serde(name = "azure_logs_ingestion")] impl SinkConfig for AzureLogsIngestionConfig { async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { - let endpoint: UriSerde = self.endpoint.parse()?; self.build_inner( cx, @@ -185,7 +184,8 @@ impl SinkConfig for AzureLogsIngestionConfig { self.stream_name.clone(), self.token_scope.clone(), self.timestamp_field.clone(), - ).await + ) + .await } fn input(&self) -> Input { diff --git a/src/sinks/azure_logs_ingestion/service.rs b/src/sinks/azure_logs_ingestion/service.rs index 7c593c4562927..60ce84b7a594a 100644 --- a/src/sinks/azure_logs_ingestion/service.rs +++ b/src/sinks/azure_logs_ingestion/service.rs @@ -1,7 +1,7 @@ +use futures::executor; +use std::sync::Arc; use std::sync::LazyLock; use std::task::{Context, Poll}; -use std::sync::Arc; -use futures::executor; use azure_core::auth::TokenCredential; @@ -121,19 +121,16 @@ impl AzureLogsIngestionService { let mut request = Request::post(&self.endpoint).body(Body::from(body))?; // TODO: make this an option, for soverign clouds - let access_token = executor::block_on(self.credential - .get_token(&[&self.token_scope])) + let access_token = executor::block_on(self.credential.get_token(&[&self.token_scope])) .expect("failed to get access token from credential"); - + let bearer = format!("Bearer {}", access_token.token.secret()); *request.headers_mut() = self.default_headers.clone(); - request - .headers_mut() - .insert( - header::AUTHORIZATION, - HeaderValue::from_str(&bearer).unwrap() - ); + request.headers_mut().insert( + header::AUTHORIZATION, + HeaderValue::from_str(&bearer).unwrap(), + ); Ok(request) } From 69774cf92671d37a6be1f363a34380c2ee15064b Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Tue, 22 Apr 2025 19:49:49 +0000 Subject: [PATCH 14/40] initial website docs --- .../components/sinks/azure_logs_ingestion.cue | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 website/cue/reference/components/sinks/azure_logs_ingestion.cue diff --git a/website/cue/reference/components/sinks/azure_logs_ingestion.cue b/website/cue/reference/components/sinks/azure_logs_ingestion.cue new file mode 100644 index 0000000000000..7e53b0e72d9d4 --- /dev/null +++ b/website/cue/reference/components/sinks/azure_logs_ingestion.cue @@ -0,0 +1,79 @@ +package metadata + +components: sinks: azure_logs_ingestion: { + title: "Azure Logs Ingestion" + + description: """ + This sink uses the Azure Monitor Logs Ingestion API to send log events to a Log Analytics Workspace. + + The `azure_identity` crate is used for authentication, which supports the standard Azure authentication types + (Workload Identity, Managed Identity, Azure CLI, Service Principal with Certificate or Secret, etc.) through + environment variables. + """ + + classes: { + commonly_used: false + delivery: "at_least_once" + development: "beta" + egress_method: "batch" + service_providers: ["Azure"] + stateful: false + } + + features: { + auto_generated: true + acknowledgements: true + healthcheck: enabled: true + send: { + batch: { + enabled: true + common: false + max_bytes: 10_000_000 + timeout_secs: 1.0 + } + compression: enabled: false + encoding: { + enabled: true + codec: enabled: false + } + proxy: enabled: true + request: enabled: false + tls: { + enabled: true + can_verify_certificate: true + can_verify_hostname: true + enabled_default: true + enabled_by_scheme: true + } + to: { + service: services.azure_logs_ingestion + + interface: { + socket: { + api: { + title: "Azure Monitor Logs Ingestion API" + url: urls.azure_logs_ingestion_endpoints + } + direction: "outgoing" + protocols: ["http"] + ssl: "required" + } + } + } + } + } + + support: { + requirements: [] + warnings: [] + notices: [] + } + + configuration: base.components.sinks.azure_logs_ingestion.configuration + + input: { + logs: true + metrics: null + traces: false + } +} From f1badac3a7ff1f0e5c0d23fdf27354a54ff35b87 Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Tue, 22 Apr 2025 20:00:15 +0000 Subject: [PATCH 15/40] add deprecation notice to azure_monitor_logs --- .../cue/reference/components/sinks/azure_monitor_logs.cue | 8 +++++++- website/cue/reference/urls.cue | 2 ++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/website/cue/reference/components/sinks/azure_monitor_logs.cue b/website/cue/reference/components/sinks/azure_monitor_logs.cue index 1d421b0f63ada..b1ad0e0f6b8e3 100644 --- a/website/cue/reference/components/sinks/azure_monitor_logs.cue +++ b/website/cue/reference/components/sinks/azure_monitor_logs.cue @@ -57,7 +57,13 @@ components: sinks: azure_monitor_logs: { support: { requirements: [] - warnings: [] + warnings: [ + """ + The upstream Data Collector API [has been deprecated](urls.azure_monitor_data_collector_deprecation), + and will stop working in September 2026. Consider migrating to the `azure_logs_ingestion` sink, which + requires additional Azure resources. + """ + ] notices: [] } diff --git a/website/cue/reference/urls.cue b/website/cue/reference/urls.cue index 2578c0b38c60a..00c627226c080 100644 --- a/website/cue/reference/urls.cue +++ b/website/cue/reference/urls.cue @@ -95,6 +95,8 @@ urls: { azure_blob_endpoints: "https://docs.microsoft.com/en-us/rest/api/storageservices/blob-service-rest-api" azure_monitor: "https://azure.microsoft.com/en-us/services/monitor/" azure_monitor_logs_endpoints: "https://docs.microsoft.com/en-us/rest/api/monitor/" + azure_monitor_data_collector_deprecation: "https://learn.microsoft.com/previous-versions/azure/azure-monitor/logs/data-collector-api" + azure_logs_ingestion_endpoints: "https://learn.microsoft.com/azure/azure-monitor/logs/logs-ingestion-api-overview" base16: "\(wikipedia)/wiki/Hexadecimal" base64: "\(wikipedia)/wiki/Base64" base64_padding: "\(wikipedia)/wiki/Base64#Output_padding" From 2a16d3a810f4f539fe634cea300745457cb5759c Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Tue, 22 Apr 2025 20:02:33 +0000 Subject: [PATCH 16/40] add deprecation notice to azure_monitor_logs --- website/cue/reference/components/sinks/azure_monitor_logs.cue | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/website/cue/reference/components/sinks/azure_monitor_logs.cue b/website/cue/reference/components/sinks/azure_monitor_logs.cue index b1ad0e0f6b8e3..a982b44a8bc1a 100644 --- a/website/cue/reference/components/sinks/azure_monitor_logs.cue +++ b/website/cue/reference/components/sinks/azure_monitor_logs.cue @@ -6,7 +6,7 @@ components: sinks: azure_monitor_logs: { classes: { commonly_used: false delivery: "at_least_once" - development: "beta" + development: "deprecated" egress_method: "batch" service_providers: ["Azure"] stateful: false From 972c817dd5515e9738e0343fd2ea82496ab32101 Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Fri, 2 May 2025 20:41:05 +0000 Subject: [PATCH 17/40] remove debug comments --- src/sinks/azure_logs_ingestion/service.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/sinks/azure_logs_ingestion/service.rs b/src/sinks/azure_logs_ingestion/service.rs index 60ce84b7a594a..1544c71a8f15f 100644 --- a/src/sinks/azure_logs_ingestion/service.rs +++ b/src/sinks/azure_logs_ingestion/service.rs @@ -23,8 +23,6 @@ const CONTENT_TYPE: &str = "application/json"; static CONTENT_TYPE_VALUE: LazyLock = LazyLock::new(|| HeaderValue::from_static(CONTENT_TYPE)); -// static X_MS_CLIENT_REQUEST_ID_HEADER: LazyLock = -// LazyLock::new(|| HeaderName::from_static("x-ms-client-request-id")); #[derive(Debug, Clone)] pub struct AzureLogsIngestionRequest { @@ -94,7 +92,6 @@ impl AzureLogsIngestionService { ) -> crate::Result { let mut parts = endpoint.into_parts(); parts.path_and_query = Some( - // https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com/dataCollectionRules/dcr-000a00a000a00000a000000aa000a0aa/streams/Custom-MyTable?api-version=2023-01-01 format!("/dataCollectionRules/{dcr_immutable_id}/streams/{stream_name}?api-version={API_VERSION}") .parse() .expect("path and query should never fail to parse"), From 4619668a1b29ba6336170ae5795c0e927dcb80ab Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Tue, 13 May 2025 06:38:52 +1200 Subject: [PATCH 18/40] Update website/cue/reference/components/sinks/base/azure_logs_ingestion.cue Co-authored-by: Rosa Trieu <107086888+rtrieu@users.noreply.github.com> --- .../components/sinks/generated/azure_logs_ingestion.cue | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/website/cue/reference/components/sinks/generated/azure_logs_ingestion.cue b/website/cue/reference/components/sinks/generated/azure_logs_ingestion.cue index 3251ef421da3c..364e09fb98224 100644 --- a/website/cue/reference/components/sinks/generated/azure_logs_ingestion.cue +++ b/website/cue/reference/components/sinks/generated/azure_logs_ingestion.cue @@ -36,7 +36,7 @@ base: components: sinks: azure_logs_ingestion: configuration: { The maximum size of a batch that is processed by a sink. This is based on the uncompressed size of the batched events, before they are - serialized/compressed. + serialized or compressed. """ required: false type: uint: { From 994121aecd0a75c3542c41e79abfba0c4ca7c4f2 Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Mon, 12 May 2025 18:52:13 +0000 Subject: [PATCH 19/40] expand on what Azure resources are required Signed-off-by: Jed Laundry --- website/cue/reference/components/sinks/azure_monitor_logs.cue | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/website/cue/reference/components/sinks/azure_monitor_logs.cue b/website/cue/reference/components/sinks/azure_monitor_logs.cue index a982b44a8bc1a..4df5dedc1bca8 100644 --- a/website/cue/reference/components/sinks/azure_monitor_logs.cue +++ b/website/cue/reference/components/sinks/azure_monitor_logs.cue @@ -61,7 +61,7 @@ components: sinks: azure_monitor_logs: { """ The upstream Data Collector API [has been deprecated](urls.azure_monitor_data_collector_deprecation), and will stop working in September 2026. Consider migrating to the `azure_logs_ingestion` sink, which - requires additional Azure resources. + requires creating Data Collection Endpoint and Data Collection Rule resources in Azure. """ ] notices: [] From d45bbf4874ed8566354a3e035cf6c9894cc89221 Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Tue, 13 May 2025 10:26:06 +0000 Subject: [PATCH 20/40] missing cue service Signed-off-by: Jed Laundry --- .../reference/components/sinks/azure_monitor_logs.cue | 8 ++++---- .../cue/reference/services/azure_logs_ingestion.cue | 10 ++++++++++ 2 files changed, 14 insertions(+), 4 deletions(-) create mode 100644 website/cue/reference/services/azure_logs_ingestion.cue diff --git a/website/cue/reference/components/sinks/azure_monitor_logs.cue b/website/cue/reference/components/sinks/azure_monitor_logs.cue index 4df5dedc1bca8..e764495d9304b 100644 --- a/website/cue/reference/components/sinks/azure_monitor_logs.cue +++ b/website/cue/reference/components/sinks/azure_monitor_logs.cue @@ -59,10 +59,10 @@ components: sinks: azure_monitor_logs: { requirements: [] warnings: [ """ - The upstream Data Collector API [has been deprecated](urls.azure_monitor_data_collector_deprecation), - and will stop working in September 2026. Consider migrating to the `azure_logs_ingestion` sink, which - requires creating Data Collection Endpoint and Data Collection Rule resources in Azure. - """ + The upstream Data Collector API [has been deprecated](urls.azure_monitor_data_collector_deprecation), + and will stop working in September 2026. Consider migrating to the `azure_logs_ingestion` sink, which + requires creating Data Collection Endpoint and Data Collection Rule resources in Azure. + """, ] notices: [] } diff --git a/website/cue/reference/services/azure_logs_ingestion.cue b/website/cue/reference/services/azure_logs_ingestion.cue new file mode 100644 index 0000000000000..45f76912ee66f --- /dev/null +++ b/website/cue/reference/services/azure_logs_ingestion.cue @@ -0,0 +1,10 @@ +package metadata + +services: azure_logs_ingestion: { + name: "Azure Logs Ingestion" + thing: "a \(name) account" + url: urls.azure_monitor + versions: null + + description: "[Azure Monitor](\(urls.azure_monitor)) is a service in Azure that provides performance and availability monitoring for applications and services in Azure, other cloud environments, or on-premises. Azure Monitor collects data from multiple sources into a common data platform (Log Analytics) where it can be analyzed for trends and anomalies." +} From 913c24ef892b9c488de84a03131ffce1443182f6 Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Sat, 6 Dec 2025 19:53:19 +0000 Subject: [PATCH 21/40] rebase, test with explicit credentials Signed-off-by: Jed Laundry --- src/sinks/azure_logs_ingestion/config.rs | 30 ++++++++++++++++++++--- src/sinks/azure_logs_ingestion/service.rs | 7 +++--- 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/src/sinks/azure_logs_ingestion/config.rs b/src/sinks/azure_logs_ingestion/config.rs index 77a0baea1e7b5..6f9dea703fbd0 100644 --- a/src/sinks/azure_logs_ingestion/config.rs +++ b/src/sinks/azure_logs_ingestion/config.rs @@ -1,6 +1,7 @@ use std::sync::Arc; -use azure_core::auth::TokenCredential; +use azure_core::credentials::TokenCredential; +use azure_identity::ClientSecretCredential; use vector_lib::configurable::configurable_component; use vector_lib::schema; use vrl::value::Kind; @@ -57,6 +58,18 @@ pub struct AzureLogsIngestionConfig { #[configurable(metadata(docs::examples = "Custom-MyTable"))] pub stream_name: String, + /// The [Azure Tenant ID][azure_tenant_id]. + #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))] + pub azure_tenant_id: String, + + /// The [Azure Client ID][azure_client_id]. + #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))] + pub azure_client_id: String, + + /// The [Azure Client Secret][azure_client_secret]. + #[configurable(metadata(docs::examples = "00-00~000000-0000000~0000000000000000000"))] + pub azure_client_secret: String, + /// [Token scope][token_scope] for dedicated Azure regions. /// /// [token_scope]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview @@ -106,6 +119,9 @@ impl Default for AzureLogsIngestionConfig { endpoint: Default::default(), dcr_immutable_id: Default::default(), stream_name: Default::default(), + azure_tenant_id: Default::default(), + azure_client_id: Default::default(), + azure_client_secret: Default::default(), token_scope: default_scope(), timestamp_field: default_timestamp_field(), encoding: Default::default(), @@ -124,6 +140,7 @@ impl AzureLogsIngestionConfig { endpoint: UriSerde, dcr_immutable_id: String, stream_name: String, + credential: Arc, token_scope: String, timestamp_field: String, ) -> crate::Result<(VectorSink, Healthcheck)> { @@ -136,8 +153,6 @@ impl AzureLogsIngestionConfig { .limit_max_bytes(MAX_BATCH_SIZE)? .into_batcher_settings()?; - let credential: Arc = azure_identity::create_credential()?; - let tls_settings = TlsSettings::from_options(self.tls.as_ref())?; let client = HttpClient::new(Some(tls_settings), &cx.proxy)?; @@ -177,11 +192,20 @@ impl_generate_config_from_default!(AzureLogsIngestionConfig); impl SinkConfig for AzureLogsIngestionConfig { async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { let endpoint: UriSerde = self.endpoint.parse()?; + + let credential: Arc = ClientSecretCredential::new( + &self.azure_tenant_id.clone(), + self.azure_client_id.clone(), + self.azure_client_secret.clone().into(), + None, + )?; + self.build_inner( cx, endpoint, self.dcr_immutable_id.clone(), self.stream_name.clone(), + credential, self.token_scope.clone(), self.timestamp_field.clone(), ) diff --git a/src/sinks/azure_logs_ingestion/service.rs b/src/sinks/azure_logs_ingestion/service.rs index 1544c71a8f15f..d9762a3294a7d 100644 --- a/src/sinks/azure_logs_ingestion/service.rs +++ b/src/sinks/azure_logs_ingestion/service.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use std::sync::LazyLock; use std::task::{Context, Poll}; -use azure_core::auth::TokenCredential; +use azure_core::credentials::TokenCredential; use bytes::Bytes; use http::{ @@ -118,8 +118,9 @@ impl AzureLogsIngestionService { let mut request = Request::post(&self.endpoint).body(Body::from(body))?; // TODO: make this an option, for soverign clouds - let access_token = executor::block_on(self.credential.get_token(&[&self.token_scope])) - .expect("failed to get access token from credential"); + let access_token = executor::block_on( + self.credential.get_token(&[&self.token_scope], None) + ).expect("failed to get access token from credential"); let bearer = format!("Bearer {}", access_token.token.secret()); From dc2059d0a96bb32db1b87995235190b78fade25f Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Sun, 7 Dec 2025 18:22:01 +0000 Subject: [PATCH 22/40] move the auth config into a struct Signed-off-by: Jed Laundry --- src/sinks/azure_logs_ingestion/config.rs | 117 ++++++++++++++---- .../sinks/generated/azure_logs_ingestion.cue | 35 ++++++ 2 files changed, 131 insertions(+), 21 deletions(-) diff --git a/src/sinks/azure_logs_ingestion/config.rs b/src/sinks/azure_logs_ingestion/config.rs index 6f9dea703fbd0..f50997d9b7fad 100644 --- a/src/sinks/azure_logs_ingestion/config.rs +++ b/src/sinks/azure_logs_ingestion/config.rs @@ -1,7 +1,17 @@ use std::sync::Arc; use azure_core::credentials::TokenCredential; -use azure_identity::ClientSecretCredential; +use azure_core::{ + error::ErrorKind, + Error, +}; + +use azure_identity::{ + AzureCliCredential, + ClientSecretCredential, + ManagedIdentityCredential, + WorkloadIdentityCredential, +}; use vector_lib::configurable::configurable_component; use vector_lib::schema; use vrl::value::Kind; @@ -58,17 +68,9 @@ pub struct AzureLogsIngestionConfig { #[configurable(metadata(docs::examples = "Custom-MyTable"))] pub stream_name: String, - /// The [Azure Tenant ID][azure_tenant_id]. - #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))] - pub azure_tenant_id: String, - - /// The [Azure Client ID][azure_client_id]. - #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))] - pub azure_client_id: String, - - /// The [Azure Client Secret][azure_client_secret]. - #[configurable(metadata(docs::examples = "00-00~000000-0000000~0000000000000000000"))] - pub azure_client_secret: String, + #[configurable(derived)] + #[serde(default)] + pub auth: AzureAuthentication, /// [Token scope][token_scope] for dedicated Azure regions. /// @@ -119,9 +121,7 @@ impl Default for AzureLogsIngestionConfig { endpoint: Default::default(), dcr_immutable_id: Default::default(), stream_name: Default::default(), - azure_tenant_id: Default::default(), - azure_client_id: Default::default(), - azure_client_secret: Default::default(), + auth: Default::default(), token_scope: default_scope(), timestamp_field: default_timestamp_field(), encoding: Default::default(), @@ -133,6 +133,85 @@ impl Default for AzureLogsIngestionConfig { } } +mod azure_credential_kinds { + #[cfg(not(target_arch = "wasm32"))] + pub const AZURE_CLI: &str = "azurecli"; + pub const MANAGED_IDENTITY: &str = "managedidentity"; + pub const WORKLOAD_IDENTITY: &str = "workloadidentity"; +} + +/// Configuration of the authentication strategy for interacting with Azure services. +#[configurable_component] +#[derive(Clone, Debug, Derivative, Eq, PartialEq)] +#[derivative(Default)] +#[serde(deny_unknown_fields, untagged)] +pub enum AzureAuthentication { + /// Use client credentials + #[derivative(Default)] + ClientSecretCredential { + /// The [Azure Tenant ID][azure_tenant_id]. + #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))] + azure_tenant_id: String, + + /// The [Azure Client ID][azure_client_id]. + #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))] + azure_client_id: String, + + /// The [Azure Client Secret][azure_client_secret]. + #[configurable(metadata(docs::examples = "00-00~000000-0000000~0000000000000000000"))] + azure_client_secret: String, + }, + + /// Use credentials from environment variables + SpecificAzureCredential { + /// The kind of Azure credential to use. + #[configurable(metadata(docs::examples = "azurecli"))] + #[configurable(metadata(docs::examples = "managedidentity"))] + #[configurable(metadata(docs::examples = "workloadidentity"))] + azure_credential_kind: String, + } +} + +impl AzureAuthentication { + /// Returns the provider for the credentials based on the authentication mechanism chosen. + pub async fn credential( + &self, + ) -> azure_core::Result> { + match self { + Self::ClientSecretCredential { + azure_tenant_id, + azure_client_id, + azure_client_secret, + } => { + let credential = ClientSecretCredential::new( + &azure_tenant_id.clone(), + azure_client_id.clone(), + azure_client_secret.clone().into(), + None, + )?; + Ok(credential) + } + + Self::SpecificAzureCredential { + azure_credential_kind, + } => { + let credential: Arc = match azure_credential_kind.replace(' ', "").to_lowercase().as_str() { + #[cfg(not(target_arch = "wasm32"))] + azure_credential_kinds::AZURE_CLI => AzureCliCredential::new(None)?, + azure_credential_kinds::MANAGED_IDENTITY => ManagedIdentityCredential::new(None)?, + azure_credential_kinds::WORKLOAD_IDENTITY => WorkloadIdentityCredential::new(None)?, + _ => { + return Err(Error::with_message(ErrorKind::Credential, || { + format!("unknown/unsupported azure_credential_kind `{}`", azure_credential_kind) + })) + } + }; + Ok(credential) + } + } + } +} + impl AzureLogsIngestionConfig { pub(super) async fn build_inner( &self, @@ -193,12 +272,8 @@ impl SinkConfig for AzureLogsIngestionConfig { async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { let endpoint: UriSerde = self.endpoint.parse()?; - let credential: Arc = ClientSecretCredential::new( - &self.azure_tenant_id.clone(), - self.azure_client_id.clone(), - self.azure_client_secret.clone().into(), - None, - )?; + let credential: Arc = self.auth.credential().await + .expect("Failed to create credential"); self.build_inner( cx, diff --git a/website/cue/reference/components/sinks/generated/azure_logs_ingestion.cue b/website/cue/reference/components/sinks/generated/azure_logs_ingestion.cue index 364e09fb98224..4baefab544ec6 100644 --- a/website/cue/reference/components/sinks/generated/azure_logs_ingestion.cue +++ b/website/cue/reference/components/sinks/generated/azure_logs_ingestion.cue @@ -27,6 +27,41 @@ base: components: sinks: azure_logs_ingestion: configuration: { type: bool: {} } } + auth: { + description: "Configuration of the authentication strategy for interacting with Azure services." + required: false + type: object: options: { + azure_client_id: { + description: "The [Azure Client ID][azure_client_id]." + required: false + type: string: { + default: "" + examples: ["00000000-0000-0000-0000-000000000000"] + } + } + azure_client_secret: { + description: "The [Azure Client Secret][azure_client_secret]." + required: false + type: string: { + default: "" + examples: ["00-00~000000-0000000~0000000000000000000"] + } + } + azure_credential_kind: { + description: "The kind of Azure credential to use." + required: true + type: string: examples: ["azurecli", "managedidentity", "workloadidentity"] + } + azure_tenant_id: { + description: "The [Azure Tenant ID][azure_tenant_id]." + required: false + type: string: { + default: "" + examples: ["00000000-0000-0000-0000-000000000000"] + } + } + } + } batch: { description: "Event batching behavior." required: false From e8558cc92e1733e3b95a912c8963e5db9343b505 Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Sun, 7 Dec 2025 18:22:23 +0000 Subject: [PATCH 23/40] doc updates from upstream template changes Signed-off-by: Jed Laundry --- .../sinks/generated/azure_logs_ingestion.cue | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/website/cue/reference/components/sinks/generated/azure_logs_ingestion.cue b/website/cue/reference/components/sinks/generated/azure_logs_ingestion.cue index 4baefab544ec6..e8ac51c090ba3 100644 --- a/website/cue/reference/components/sinks/generated/azure_logs_ingestion.cue +++ b/website/cue/reference/components/sinks/generated/azure_logs_ingestion.cue @@ -1,18 +1,18 @@ package metadata -base: components: sinks: azure_logs_ingestion: configuration: { +generated: components: sinks: azure_logs_ingestion: configuration: { acknowledgements: { description: """ Controls how acknowledgements are handled for this sink. See [End-to-end Acknowledgements][e2e_acks] for more information on how event acknowledgement is handled. - [e2e_acks]: https://vector.dev/docs/about/under-the-hood/architecture/end-to-end-acknowledgements/ + [e2e_acks]: https://vector.dev/docs/architecture/end-to-end-acknowledgements/ """ required: false type: object: options: enabled: { description: """ - Whether or not end-to-end acknowledgements are enabled. + Controls whether or not end-to-end acknowledgements are enabled. When enabled for a sink, any source that supports end-to-end acknowledgements that is connected to that sink waits for events @@ -126,7 +126,7 @@ base: components: sinks: azure_logs_ingestion: configuration: { unix_float: "Represent the timestamp as a Unix timestamp in floating point." unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds." unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds." - unix_us: "Represent the timestamp as a Unix timestamp in microseconds" + unix_us: "Represent the timestamp as a Unix timestamp in microseconds." } } } @@ -208,12 +208,12 @@ base: components: sinks: azure_logs_ingestion: configuration: { description: """ Scale of RTT deviations which are not considered anomalous. - Valid values are greater than or equal to `0`, and we expect reasonable values to range from `1.0` to `3.0`. + Valid values are greater than or equal to `0`, and reasonable values range from `1.0` to `3.0`. - When calculating the past RTT average, we also compute a secondary “deviation” value that indicates how variable - those values are. We use that deviation when comparing the past RTT average to the current measurements, so we + When calculating the past RTT average, a secondary “deviation” value is also computed that indicates how variable + those values are. That deviation is used when comparing the past RTT average to the current measurements, so we can ignore increases in RTT that are within an expected range. This factor is used to scale up the deviation to - an appropriate range. Larger values cause the algorithm to ignore larger increases in the RTT. + an appropriate range. Larger values cause the algorithm to ignore larger increases in the RTT. """ required: false type: float: default: 2.5 @@ -235,7 +235,7 @@ base: components: sinks: azure_logs_ingestion: configuration: { adaptive: """ Concurrency is managed by Vector's [Adaptive Request Concurrency][arc] feature. - [arc]: https://vector.dev/docs/about/under-the-hood/networking/arc/ + [arc]: https://vector.dev/docs/architecture/arc/ """ none: """ A fixed concurrency of 1. @@ -275,7 +275,7 @@ base: components: sinks: azure_logs_ingestion: configuration: { description: """ The amount of time to wait before attempting the first retry for a failed request. - After the first retry has failed, the fibonacci sequence is used to select future backoffs. + After the first retry has failed, the Fibonacci sequence is used to select future backoffs. """ required: false type: uint: { From 3bba5f25c4570bf82312f856a4369d9cfde187d5 Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Sun, 7 Dec 2025 18:23:42 +0000 Subject: [PATCH 24/40] cleanup Cargo.lock change from before rebase Signed-off-by: Jed Laundry --- Cargo.lock | 22 +--------------------- 1 file changed, 1 insertion(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4a541b971649e..ff9e8907b9c2f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -880,25 +880,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "async-process" -version = "2.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63255f1dc2381611000436537bbedfe83183faa303a5a0edaf191edef06526bb" -dependencies = [ - "async-channel 2.3.1", - "async-io 2.1.0", - "async-lock 3.4.0", - "async-signal", - "async-task", - "blocking", - "cfg-if", - "event-listener 5.3.1", - "futures-lite 2.4.0", - "rustix 0.38.37", - "tracing 0.1.40", -] - [[package]] name = "async-reactor-trait" version = "1.1.0" @@ -1755,7 +1736,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "82c33c072c9d87777262f35abfe2a64b609437076551d4dac8373e60f0e3fde9" dependencies = [ "async-lock 3.4.0", - "async-process 2.3.0", "async-trait", "bytes 1.10.1", "futures 0.3.31", @@ -10478,7 +10458,7 @@ dependencies = [ "async-io 1.13.0", "async-lock 2.8.0", "async-net", - "async-process 1.8.1", + "async-process", "blocking", "futures-lite 1.13.0", ] From 745db14c899f79397cadf73a7bcf56f6e049e1f0 Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Mon, 8 Dec 2025 17:43:00 +0000 Subject: [PATCH 25/40] change azure_client_secret to SensitiveString Signed-off-by: Jed Laundry --- src/sinks/azure_logs_ingestion/config.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/sinks/azure_logs_ingestion/config.rs b/src/sinks/azure_logs_ingestion/config.rs index f50997d9b7fad..8b32de24a6446 100644 --- a/src/sinks/azure_logs_ingestion/config.rs +++ b/src/sinks/azure_logs_ingestion/config.rs @@ -12,8 +12,11 @@ use azure_identity::{ ManagedIdentityCredential, WorkloadIdentityCredential, }; -use vector_lib::configurable::configurable_component; -use vector_lib::schema; +use vector_lib::{ + schema, + configurable::configurable_component, + sensitive_string::SensitiveString, +}; use vrl::value::Kind; use crate::{ @@ -159,7 +162,7 @@ pub enum AzureAuthentication { /// The [Azure Client Secret][azure_client_secret]. #[configurable(metadata(docs::examples = "00-00~000000-0000000~0000000000000000000"))] - azure_client_secret: String, + azure_client_secret: SensitiveString, }, /// Use credentials from environment variables @@ -183,10 +186,11 @@ impl AzureAuthentication { azure_client_id, azure_client_secret, } => { + let secret: String = azure_client_secret.inner().into(); let credential = ClientSecretCredential::new( &azure_tenant_id.clone(), azure_client_id.clone(), - azure_client_secret.clone().into(), + secret.into(), None, )?; Ok(credential) From 7c064c3f25e09e9b6a63436b9d80baa2ee535abe Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Mon, 8 Dec 2025 18:22:25 +0000 Subject: [PATCH 26/40] basic client credentials unit test Signed-off-by: Jed Laundry --- src/sinks/azure_logs_ingestion/mod.rs | 4 +-- src/sinks/azure_logs_ingestion/tests.rs | 46 +++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 2 deletions(-) create mode 100644 src/sinks/azure_logs_ingestion/tests.rs diff --git a/src/sinks/azure_logs_ingestion/mod.rs b/src/sinks/azure_logs_ingestion/mod.rs index dddcfc6fa586d..292bde918d227 100644 --- a/src/sinks/azure_logs_ingestion/mod.rs +++ b/src/sinks/azure_logs_ingestion/mod.rs @@ -7,7 +7,7 @@ mod config; mod service; mod sink; -// #[cfg(test)] -// mod tests; +#[cfg(test)] +mod tests; pub use config::AzureLogsIngestionConfig; diff --git a/src/sinks/azure_logs_ingestion/tests.rs b/src/sinks/azure_logs_ingestion/tests.rs new file mode 100644 index 0000000000000..ec2cd0cf1893b --- /dev/null +++ b/src/sinks/azure_logs_ingestion/tests.rs @@ -0,0 +1,46 @@ + +use super::config::AzureLogsIngestionConfig; + + +#[test] +fn generate_config() { + crate::test_util::test_generate_config::(); +} + +#[test] +fn basic_config_with_client_credentials() { + let config: AzureLogsIngestionConfig = toml::from_str::( + r#" + endpoint = "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com" + dcr_immutable_id = "dcr-00000000000000000000000000000000" + stream_name = "Custom-UnitTest" + + [auth] + azure_tenant_id = "00000000-0000-0000-0000-000000000000" + azure_client_id = "mock-client-id" + azure_client_secret = "mock-client-secret" + "#) + .expect("Config parsing failed"); + + assert_eq!(config.endpoint, "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com"); + assert_eq!(config.dcr_immutable_id, "dcr-00000000000000000000000000000000"); + assert_eq!(config.stream_name, "Custom-UnitTest"); + assert_eq!(config.token_scope, "https://monitor.azure.com/.default"); + assert_eq!(config.timestamp_field, "TimeGenerated"); + + match &config.auth { + crate::sinks::azure_logs_ingestion::config::AzureAuthentication::ClientSecretCredential { + azure_tenant_id, + azure_client_id, + azure_client_secret, + } => { + assert_eq!(azure_tenant_id, "00000000-0000-0000-0000-000000000000"); + assert_eq!(azure_client_id, "mock-client-id"); + let secret: String = azure_client_secret.inner().into(); + assert_eq!(secret, "mock-client-secret"); + } + _ => panic!("Expected ClientSecretCredential variant"), + } +} + +// TODO test config with ManagedIdentity (will need to mock env vars...) From 7a8f1b969a773b024063adfc721a4919335e654f Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Mon, 8 Dec 2025 19:07:41 +0000 Subject: [PATCH 27/40] full request unit test Signed-off-by: Jed Laundry --- src/sinks/azure_logs_ingestion/tests.rs | 158 ++++++++++++++++++++++++ 1 file changed, 158 insertions(+) diff --git a/src/sinks/azure_logs_ingestion/tests.rs b/src/sinks/azure_logs_ingestion/tests.rs index ec2cd0cf1893b..cfb5dd3ed03ed 100644 --- a/src/sinks/azure_logs_ingestion/tests.rs +++ b/src/sinks/azure_logs_ingestion/tests.rs @@ -1,6 +1,24 @@ +use std::time::Duration; +use futures::stream; +use http::Response; +use hyper::body; +use tokio::time::timeout; +use vector_lib::config::log_schema; + +use azure_core::credentials::TokenCredential; +use azure_identity::{ClientSecretCredential, ClientSecretCredentialOptions, TokenCredentialOptions}; use super::config::AzureLogsIngestionConfig; +use crate::{ + event::LogEvent, + sinks::prelude::*, + test_util::{ + components::{run_and_assert_sink_compliance, SINK_TAGS}, + http::spawn_blackhole_http_server, + }, +}; + #[test] fn generate_config() { @@ -44,3 +62,143 @@ fn basic_config_with_client_credentials() { } // TODO test config with ManagedIdentity (will need to mock env vars...) + +fn insert_timestamp_kv(log: &mut LogEvent) -> (String, String) { + let now = chrono::Utc::now(); + + let timestamp_value = now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true); + log.insert(log_schema().timestamp_key_target_path().unwrap(), now); + + ( + log_schema().timestamp_key().unwrap().to_string(), + timestamp_value, + ) +} + +#[tokio::test] +async fn correct_request() { + + // We need to run our own mock OAuth endpoint as well + let (authority_tx, mut _authority_rx) = tokio::sync::mpsc::channel(1); + let mock_token_authority = spawn_blackhole_http_server(move |request| { + let authority_tx = authority_tx.clone(); + async move { + authority_tx.send(request).await.unwrap(); + let body = serde_json::json!({ + "access_token": "mock-access-token", + "token_type": "Bearer", + "expires_in": 3600 + }).to_string(); + + Ok(Response::builder() + .header("Content-Type", "application/json") + .body(body.into()) + .unwrap()) + } + }) + .await; + + println!("Mock token authority running at {}", mock_token_authority.to_string()); + + let mut credential_options = TokenCredentialOptions::default(); + //credential_options.set_authority_host("http://127.0.0.1:9001".into()); + credential_options.set_authority_host(mock_token_authority.to_string()); + + let credential: std::sync::Arc = ClientSecretCredential::new( + "00000000-0000-0000-0000-000000000000", + "mock-client-id".into(), + "mock-client-secret".into(), + Some(ClientSecretCredentialOptions { + credential_options: credential_options, + }), + ) + .expect("failed to create ClientSecretCredential"); + + println!("Created ClientSecretCredential"); + + println!("Initial access token: {:?}", + credential.get_token( + &["https://monitor.azure.com/.default"], + None + ).await + .expect("failed to get initial access token") + ); + + let config: AzureLogsIngestionConfig = toml::from_str( + r#" + endpoint = "http://localhost:9001" + dcr_immutable_id = "dcr-00000000000000000000000000000000" + stream_name = "Custom-UnitTest" + + [auth] + azure_tenant_id = "00000000-0000-0000-0000-000000000000" + azure_client_id = "mock-client-id" + azure_client_secret = "mock-client-secret" + "#) + .unwrap(); + + let mut log1 = [("message", "hello")].iter().copied().collect::(); + let (_timestamp_key1, timestamp_value1) = insert_timestamp_kv(&mut log1); + + let mut log2 = [("message", "world")].iter().copied().collect::(); + let (_timestamp_key2, timestamp_value2) = insert_timestamp_kv(&mut log2); + + let (endpoint_tx, mut endpoint_rx) = tokio::sync::mpsc::channel(1); + let mock_endpoint = spawn_blackhole_http_server(move |request| { + let endpoint_tx = endpoint_tx.clone(); + async move { + endpoint_tx.send(request).await.unwrap(); + Ok(Response::new(hyper::Body::empty())) + } + }) + .await; + + let context = SinkContext::default(); + + let (sink, _healthcheck) = config + .build_inner( + context, + mock_endpoint.into(), + config.dcr_immutable_id.clone(), + config.stream_name.clone(), + credential, + config.token_scope.clone(), + config.timestamp_field.clone(), + ) + .await + .unwrap(); + + run_and_assert_sink_compliance(sink, stream::iter(vec![log1, log2]), &SINK_TAGS).await; + + let request = timeout(Duration::from_millis(500), endpoint_rx.recv()) + .await + .unwrap() + .unwrap(); + + let (parts, body) = request.into_parts(); + assert_eq!(&parts.method.to_string(), "POST"); + + let body = body::to_bytes(body).await.unwrap(); + let body_json: serde_json::Value = serde_json::from_slice(&body[..]).unwrap(); + let expected_json = serde_json::json!([ + { + "TimeGenerated": timestamp_value1, + "message": "hello" + }, + { + "TimeGenerated": timestamp_value2, + "message": "world" + } + ]); + assert_eq!(body_json, expected_json); + + let headers = parts.headers; + let authorization = headers.get("Authorization").unwrap(); + assert_eq!(authorization.to_str().unwrap(), "Bearer mock-access-token"); + + assert_eq!( + &parts.uri.path_and_query().unwrap().to_string(), + "/dataCollectionRules/dcr-00000000000000000000000000000000/streams/Custom-UnitTest?api-version=2023-01-01" + ); + +} From fa31afdd6e5c739323fe04ab9ea4ed3ce6ef38bb Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Mon, 8 Dec 2025 23:19:51 +0000 Subject: [PATCH 28/40] remove executor::block_on Signed-off-by: Jed Laundry --- src/sinks/azure_logs_ingestion/service.rs | 30 +++++++++++++---------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/src/sinks/azure_logs_ingestion/service.rs b/src/sinks/azure_logs_ingestion/service.rs index d9762a3294a7d..07f1d63cb9212 100644 --- a/src/sinks/azure_logs_ingestion/service.rs +++ b/src/sinks/azure_logs_ingestion/service.rs @@ -1,9 +1,11 @@ -use futures::executor; use std::sync::Arc; use std::sync::LazyLock; use std::task::{Context, Poll}; -use azure_core::credentials::TokenCredential; +use azure_core::credentials::{ + AccessToken, + TokenCredential, +}; use bytes::Bytes; use http::{ @@ -114,13 +116,15 @@ impl AzureLogsIngestionService { }) } - fn build_request(&self, body: Bytes) -> crate::Result> { + async fn build_request(&self, body: Bytes) -> crate::Result> { let mut request = Request::post(&self.endpoint).body(Body::from(body))?; - // TODO: make this an option, for soverign clouds - let access_token = executor::block_on( - self.credential.get_token(&[&self.token_scope], None) - ).expect("failed to get access token from credential"); + // TODO: make this an option, for sovereign clouds + let access_token: AccessToken = self + .credential + .get_token(&[&self.token_scope], None) + .await + .expect("failed to get access token from credential"); let bearer = format!("Bearer {}", access_token.token.secret()); @@ -134,10 +138,10 @@ impl AzureLogsIngestionService { } pub fn healthcheck(&self) -> Healthcheck { - let mut client = self.client.clone(); - let request = self.build_request(Bytes::from("[]")); + let service = self.clone(); + let mut client = service.client.clone(); Box::pin(async move { - let request = request?; + let request = service.build_request(Bytes::from("[]")).await?; let res = client.call(request).in_current_span().await?; if res.status().is_server_error() { @@ -180,10 +184,10 @@ impl Service for AzureLogsIngestionService { // Emission of Error internal event is handled upstream by the caller. fn call(&mut self, request: AzureLogsIngestionRequest) -> Self::Future { - let mut client = self.client.clone(); - let http_request = self.build_request(request.body); + let service = self.clone(); + let mut client = service.client.clone(); Box::pin(async move { - let http_request = http_request?; + let http_request = service.build_request(request.body).await?; let response = client.call(http_request).in_current_span().await?; Ok(AzureLogsIngestionResponse { http_status: response.status(), From 377d1a5fc62fc507292c5c9760924266c2412b04 Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Fri, 12 Dec 2025 20:49:47 +0000 Subject: [PATCH 29/40] add healthcheck test Signed-off-by: Jed Laundry --- src/sinks/azure_logs_ingestion/tests.rs | 103 +++++++++++++++++++++--- 1 file changed, 91 insertions(+), 12 deletions(-) diff --git a/src/sinks/azure_logs_ingestion/tests.rs b/src/sinks/azure_logs_ingestion/tests.rs index cfb5dd3ed03ed..2e822962c0d21 100644 --- a/src/sinks/azure_logs_ingestion/tests.rs +++ b/src/sinks/azure_logs_ingestion/tests.rs @@ -5,8 +5,13 @@ use hyper::body; use tokio::time::timeout; use vector_lib::config::log_schema; -use azure_core::credentials::TokenCredential; -use azure_identity::{ClientSecretCredential, ClientSecretCredentialOptions, TokenCredentialOptions}; +use azure_core::credentials::{AccessToken, TokenCredential}; +use azure_core::date::OffsetDateTime; +use azure_identity::{ + ClientSecretCredential, + ClientSecretCredentialOptions, + TokenCredentialOptions, +}; use super::config::AzureLogsIngestionConfig; @@ -14,7 +19,10 @@ use crate::{ event::LogEvent, sinks::prelude::*, test_util::{ - components::{run_and_assert_sink_compliance, SINK_TAGS}, + components::{ + run_and_assert_sink_compliance, + SINK_TAGS, + }, http::spawn_blackhole_http_server, }, }; @@ -78,7 +86,7 @@ fn insert_timestamp_kv(log: &mut LogEvent) -> (String, String) { #[tokio::test] async fn correct_request() { - // We need to run our own mock OAuth endpoint as well + // Other tests can use `create_mock_credential`, we're going to run this end-to-end test with our own mock OAuth endpoint as well let (authority_tx, mut _authority_rx) = tokio::sync::mpsc::channel(1); let mock_token_authority = spawn_blackhole_http_server(move |request| { let authority_tx = authority_tx.clone(); @@ -116,14 +124,6 @@ async fn correct_request() { println!("Created ClientSecretCredential"); - println!("Initial access token: {:?}", - credential.get_token( - &["https://monitor.azure.com/.default"], - None - ).await - .expect("failed to get initial access token") - ); - let config: AzureLogsIngestionConfig = toml::from_str( r#" endpoint = "http://localhost:9001" @@ -202,3 +202,82 @@ async fn correct_request() { ); } + +fn create_mock_credential() -> impl TokenCredential { + #[derive(Debug)] + struct MockCredential; + + #[async_trait::async_trait] + impl TokenCredential for MockCredential { + async fn get_token( + &self, + _scopes: &[&str], + _options: Option, + ) -> azure_core::Result { + Ok(AccessToken::new( + "mock-access-token".to_string(), + OffsetDateTime::now_utc() + Duration::from_hours(1), + )) + } + } + + MockCredential +} + +#[tokio::test] +async fn mock_healthcheck_with_403_response() { + + let config: AzureLogsIngestionConfig = toml::from_str( + r#" + endpoint = "http://localhost:9001" + dcr_immutable_id = "dcr-00000000000000000000000000000000" + stream_name = "Custom-UnitTest" + + [auth] + azure_tenant_id = "00000000-0000-0000-0000-000000000000" + azure_client_id = "mock-client-id" + azure_client_secret = "mock-client-secret" + "#) + .unwrap(); + + let mut log1 = [("message", "hello")].iter().copied().collect::(); + let (_timestamp_key1, _timestamp_value1) = insert_timestamp_kv(&mut log1); + + let (endpoint_tx, _endpoint_rx) = tokio::sync::mpsc::channel(1); + let mock_endpoint = spawn_blackhole_http_server(move |request| { + let endpoint_tx = endpoint_tx.clone(); + async move { + endpoint_tx.send(request).await.unwrap(); + let body = serde_json::json!({ + "error": "bla", + }).to_string(); + + Ok(Response::builder() + .status(403) + .header("Content-Type", "application/json") + .body(body.into()) + .unwrap()) + } + }) + .await; + + let context = SinkContext::default(); + let credential = std::sync::Arc::new(create_mock_credential()); + + let (_sink, healthcheck) = config + .build_inner( + context, + mock_endpoint.into(), + config.dcr_immutable_id.clone(), + config.stream_name.clone(), + credential, + config.token_scope.clone(), + config.timestamp_field.clone(), + ) + .await + .unwrap(); + + let hc_err = healthcheck.await.unwrap_err(); + let err_str = hc_err.to_string(); + assert!(err_str.contains("Forbidden"), "Healthcheck error does not contain 'Forbidden': {}", err_str); +} \ No newline at end of file From 5654a3198eeb5ac9a2a5794ed1cd5413da573370 Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Sun, 14 Dec 2025 02:58:54 +0000 Subject: [PATCH 30/40] improve error messages Signed-off-by: Jed Laundry --- src/sinks/azure_logs_ingestion/config.rs | 20 +++++++++++++++++--- src/sinks/azure_logs_ingestion/service.rs | 5 ++--- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/sinks/azure_logs_ingestion/config.rs b/src/sinks/azure_logs_ingestion/config.rs index 8b32de24a6446..e3584a784b4aa 100644 --- a/src/sinks/azure_logs_ingestion/config.rs +++ b/src/sinks/azure_logs_ingestion/config.rs @@ -186,6 +186,21 @@ impl AzureAuthentication { azure_client_id, azure_client_secret, } => { + if azure_tenant_id == "" { + return Err(Error::with_message(ErrorKind::Credential, || { + format!("`auth.azure_tenant_id` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.") + })) + } + if azure_client_id == "" { + return Err(Error::with_message(ErrorKind::Credential, || { + format!("`auth.azure_client_id` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.") + })) + } + if azure_client_secret.inner() == "" { + return Err(Error::with_message(ErrorKind::Credential, || { + format!("`auth.azure_client_secret` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.") + })) + } let secret: String = azure_client_secret.inner().into(); let credential = ClientSecretCredential::new( &azure_tenant_id.clone(), @@ -206,7 +221,7 @@ impl AzureAuthentication { azure_credential_kinds::WORKLOAD_IDENTITY => WorkloadIdentityCredential::new(None)?, _ => { return Err(Error::with_message(ErrorKind::Credential, || { - format!("unknown/unsupported azure_credential_kind `{}`", azure_credential_kind) + format!("`auth.azure_credential_kind` `{azure_credential_kind}` is unknown/unsupported") })) } }; @@ -276,8 +291,7 @@ impl SinkConfig for AzureLogsIngestionConfig { async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { let endpoint: UriSerde = self.endpoint.parse()?; - let credential: Arc = self.auth.credential().await - .expect("Failed to create credential"); + let credential: Arc = self.auth.credential().await?; self.build_inner( cx, diff --git a/src/sinks/azure_logs_ingestion/service.rs b/src/sinks/azure_logs_ingestion/service.rs index 07f1d63cb9212..883dcaf357243 100644 --- a/src/sinks/azure_logs_ingestion/service.rs +++ b/src/sinks/azure_logs_ingestion/service.rs @@ -123,8 +123,7 @@ impl AzureLogsIngestionService { let access_token: AccessToken = self .credential .get_token(&[&self.token_scope], None) - .await - .expect("failed to get access token from credential"); + .await?; let bearer = format!("Bearer {}", access_token.token.secret()); @@ -157,7 +156,7 @@ impl AzureLogsIngestionService { } if res.status() == StatusCode::NOT_FOUND { - return Err("Azure returned 404 Not Found. Either the URL provided is incorrect, or the request is too large".into()); + return Err("Azure returned 404 Not Found. Either the URL provided is incorrect, or the request is too large.".into()); } if res.status() == StatusCode::BAD_REQUEST { From 18ba4f3a93a9dade7c1588d28e4fb8368546d970 Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Sun, 14 Dec 2025 04:09:24 +0000 Subject: [PATCH 31/40] further config tests Signed-off-by: Jed Laundry --- src/sinks/azure_logs_ingestion/tests.rs | 75 +++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/src/sinks/azure_logs_ingestion/tests.rs b/src/sinks/azure_logs_ingestion/tests.rs index 2e822962c0d21..c8b5526d6273a 100644 --- a/src/sinks/azure_logs_ingestion/tests.rs +++ b/src/sinks/azure_logs_ingestion/tests.rs @@ -33,6 +33,52 @@ fn generate_config() { crate::test_util::test_generate_config::(); } +#[tokio::test] +async fn basic_config_error_with_no_auth() { + let config: AzureLogsIngestionConfig = toml::from_str::( + r#" + endpoint = "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com" + dcr_immutable_id = "dcr-00000000000000000000000000000000" + stream_name = "Custom-UnitTest" + "#) + .expect("Config parsing failed"); + + assert_eq!(config.endpoint, "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com"); + assert_eq!(config.dcr_immutable_id, "dcr-00000000000000000000000000000000"); + assert_eq!(config.stream_name, "Custom-UnitTest"); + assert_eq!(config.token_scope, "https://monitor.azure.com/.default"); + assert_eq!(config.timestamp_field, "TimeGenerated"); + + match &config.auth { + crate::sinks::azure_logs_ingestion::config::AzureAuthentication::ClientSecretCredential { + azure_tenant_id, + azure_client_id, + azure_client_secret, + } => { + assert_eq!(azure_tenant_id, ""); + assert_eq!(azure_client_id, ""); + let secret: String = azure_client_secret.inner().into(); + assert_eq!(secret, ""); + } + _ => panic!("Expected ClientSecretCredential variant"), + } + + let cx = SinkContext::default(); + let sink = config.build(cx).await; + match sink { + Ok(_) => panic!("Config build should have errored due to missing auth info"), + Err(e) => { + let err_str = e.to_string(); + assert!( + err_str.contains("`auth.azure_tenant_id` is blank"), + "Config build did not complain about azure_tenant_id being blank: {}", + err_str + ); + } + } + +} + #[test] fn basic_config_with_client_credentials() { let config: AzureLogsIngestionConfig = toml::from_str::( @@ -69,6 +115,35 @@ fn basic_config_with_client_credentials() { } } +#[test] +fn basic_config_with_managed_identity() { + let config: AzureLogsIngestionConfig = toml::from_str::( + r#" + endpoint = "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com" + dcr_immutable_id = "dcr-00000000000000000000000000000000" + stream_name = "Custom-UnitTest" + + [auth] + azure_credential_kind = "managedidentity" + "#) + .expect("Config parsing failed"); + + assert_eq!(config.endpoint, "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com"); + assert_eq!(config.dcr_immutable_id, "dcr-00000000000000000000000000000000"); + assert_eq!(config.stream_name, "Custom-UnitTest"); + assert_eq!(config.token_scope, "https://monitor.azure.com/.default"); + assert_eq!(config.timestamp_field, "TimeGenerated"); + + match &config.auth { + crate::sinks::azure_logs_ingestion::config::AzureAuthentication::SpecificAzureCredential { + azure_credential_kind, + } => { + assert_eq!(azure_credential_kind, "managedidentity"); + } + _ => panic!("Expected SpecificAzureCredential variant"), + } +} + // TODO test config with ManagedIdentity (will need to mock env vars...) fn insert_timestamp_kv(log: &mut LogEvent) -> (String, String) { From 67652b4a7d00a81a182c8bc660bef1f6966ca06e Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Sun, 14 Dec 2025 17:04:04 +0000 Subject: [PATCH 32/40] add user-assigned managed identity support Signed-off-by: Jed Laundry --- src/sinks/azure_logs_ingestion/config.rs | 19 +++++++++++++++++-- src/sinks/azure_logs_ingestion/tests.rs | 1 + 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/sinks/azure_logs_ingestion/config.rs b/src/sinks/azure_logs_ingestion/config.rs index e3584a784b4aa..a9ee958a8bf59 100644 --- a/src/sinks/azure_logs_ingestion/config.rs +++ b/src/sinks/azure_logs_ingestion/config.rs @@ -10,7 +10,9 @@ use azure_identity::{ AzureCliCredential, ClientSecretCredential, ManagedIdentityCredential, - WorkloadIdentityCredential, + ManagedIdentityCredentialOptions, + UserAssignedId, + WorkloadIdentityCredential }; use vector_lib::{ schema, @@ -172,6 +174,11 @@ pub enum AzureAuthentication { #[configurable(metadata(docs::examples = "managedidentity"))] #[configurable(metadata(docs::examples = "workloadidentity"))] azure_credential_kind: String, + + /// The User Assigned Managed Identity (Client ID) to use. Only applicable when `azure_credential_kind` is `managedidentity`. + #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))] + #[serde(default, skip_serializing_if = "Option::is_none")] + user_assigned_managed_identity_id: Option, } } @@ -213,11 +220,19 @@ impl AzureAuthentication { Self::SpecificAzureCredential { azure_credential_kind, + user_assigned_managed_identity_id, } => { let credential: Arc = match azure_credential_kind.replace(' ', "").to_lowercase().as_str() { #[cfg(not(target_arch = "wasm32"))] azure_credential_kinds::AZURE_CLI => AzureCliCredential::new(None)?, - azure_credential_kinds::MANAGED_IDENTITY => ManagedIdentityCredential::new(None)?, + azure_credential_kinds::MANAGED_IDENTITY => { + let mut options = ManagedIdentityCredentialOptions::default(); + if user_assigned_managed_identity_id.is_some() { + options.user_assigned_id = Some(UserAssignedId::ClientId(user_assigned_managed_identity_id.clone().unwrap())); + } + + ManagedIdentityCredential::new(Some(options))? + } azure_credential_kinds::WORKLOAD_IDENTITY => WorkloadIdentityCredential::new(None)?, _ => { return Err(Error::with_message(ErrorKind::Credential, || { diff --git a/src/sinks/azure_logs_ingestion/tests.rs b/src/sinks/azure_logs_ingestion/tests.rs index c8b5526d6273a..769857fb33198 100644 --- a/src/sinks/azure_logs_ingestion/tests.rs +++ b/src/sinks/azure_logs_ingestion/tests.rs @@ -137,6 +137,7 @@ fn basic_config_with_managed_identity() { match &config.auth { crate::sinks::azure_logs_ingestion::config::AzureAuthentication::SpecificAzureCredential { azure_credential_kind, + user_assigned_managed_identity_id: _, } => { assert_eq!(azure_credential_kind, "managedidentity"); } From f3b8c4d451e34461708cecd4ad363cba28b21eb0 Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Sun, 14 Dec 2025 17:44:34 +0000 Subject: [PATCH 33/40] add federated managed identity Signed-off-by: Jed Laundry --- src/sinks/azure_logs_ingestion/config.rs | 68 +++++++++++++++++-- src/sinks/azure_logs_ingestion/tests.rs | 2 + .../sinks/generated/azure_logs_ingestion.cue | 17 ++++- 3 files changed, 79 insertions(+), 8 deletions(-) diff --git a/src/sinks/azure_logs_ingestion/config.rs b/src/sinks/azure_logs_ingestion/config.rs index a9ee958a8bf59..7b742c497ee38 100644 --- a/src/sinks/azure_logs_ingestion/config.rs +++ b/src/sinks/azure_logs_ingestion/config.rs @@ -7,12 +7,7 @@ use azure_core::{ }; use azure_identity::{ - AzureCliCredential, - ClientSecretCredential, - ManagedIdentityCredential, - ManagedIdentityCredentialOptions, - UserAssignedId, - WorkloadIdentityCredential + AzureCliCredential, ClientAssertion, ClientAssertionCredential, ClientSecretCredential, ManagedIdentityCredential, ManagedIdentityCredentialOptions, UserAssignedId, WorkloadIdentityCredential }; use vector_lib::{ schema, @@ -142,6 +137,7 @@ mod azure_credential_kinds { #[cfg(not(target_arch = "wasm32"))] pub const AZURE_CLI: &str = "azurecli"; pub const MANAGED_IDENTITY: &str = "managedidentity"; + pub const MANAGED_IDENTITY_CLIENT_ASSERTION: &str = "managedidentityclientassertion"; pub const WORKLOAD_IDENTITY: &str = "workloadidentity"; } @@ -172,13 +168,44 @@ pub enum AzureAuthentication { /// The kind of Azure credential to use. #[configurable(metadata(docs::examples = "azurecli"))] #[configurable(metadata(docs::examples = "managedidentity"))] + #[configurable(metadata(docs::examples = "managedidentityclientassertion"))] #[configurable(metadata(docs::examples = "workloadidentity"))] azure_credential_kind: String, - /// The User Assigned Managed Identity (Client ID) to use. Only applicable when `azure_credential_kind` is `managedidentity`. + /// The User Assigned Managed Identity (Client ID) to use. Only applicable when `azure_credential_kind` is `managedidentity` or `managedidentityclientassertion`. #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))] #[serde(default, skip_serializing_if = "Option::is_none")] user_assigned_managed_identity_id: Option, + + /// The target Tenant ID to use. Only applicable when `azure_credential_kind` is `managedidentityclientassertion`. + #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))] + #[serde(default, skip_serializing_if = "Option::is_none")] + client_assertion_tenant_id: Option, + + /// The target Client ID to use. Only applicable when `azure_credential_kind` is `managedidentityclientassertion`. + #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))] + #[serde(default, skip_serializing_if = "Option::is_none")] + client_assertion_client_id: Option, + } +} + +#[derive(Debug)] +struct ManagedIdentityClientAssertion { + credential: Arc, + scope: String, +} + +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +impl ClientAssertion for ManagedIdentityClientAssertion { + async fn secret(&self) -> azure_core::Result { + Ok(self + .credential + .get_token(&[&self.scope], None) + .await? + .token + .secret() + .to_string()) } } @@ -221,6 +248,8 @@ impl AzureAuthentication { Self::SpecificAzureCredential { azure_credential_kind, user_assigned_managed_identity_id, + client_assertion_tenant_id, + client_assertion_client_id, } => { let credential: Arc = match azure_credential_kind.replace(' ', "").to_lowercase().as_str() { #[cfg(not(target_arch = "wasm32"))] @@ -233,6 +262,31 @@ impl AzureAuthentication { ManagedIdentityCredential::new(Some(options))? } + azure_credential_kinds::MANAGED_IDENTITY_CLIENT_ASSERTION => { + if client_assertion_tenant_id.is_none() || client_assertion_client_id.is_none() { + return Err(Error::with_message(ErrorKind::Credential, || { + format!("`auth.client_assertion_tenant_id` and `auth.client_assertion_client_id` must be set when using `auth.azure_credential_kind` of `managedidentityclientassertion`") + })) + } + + let mut options = ManagedIdentityCredentialOptions::default(); + if user_assigned_managed_identity_id.is_some() { + options.user_assigned_id = Some(UserAssignedId::ClientId(user_assigned_managed_identity_id.clone().unwrap())); + } + let msi: Arc = ManagedIdentityCredential::new(Some(options))?; + let assertion = ManagedIdentityClientAssertion { + credential: msi, + // Future: make this configurable for sovereign clouds? (no way to test...) + scope: "api://AzureADTokenExchange/.default".to_string(), + }; + + ClientAssertionCredential::new( + client_assertion_tenant_id.clone().unwrap(), + client_assertion_client_id.clone().unwrap(), + assertion, + None, + )? + } azure_credential_kinds::WORKLOAD_IDENTITY => WorkloadIdentityCredential::new(None)?, _ => { return Err(Error::with_message(ErrorKind::Credential, || { diff --git a/src/sinks/azure_logs_ingestion/tests.rs b/src/sinks/azure_logs_ingestion/tests.rs index 769857fb33198..cd11e2ddd77c3 100644 --- a/src/sinks/azure_logs_ingestion/tests.rs +++ b/src/sinks/azure_logs_ingestion/tests.rs @@ -138,6 +138,8 @@ fn basic_config_with_managed_identity() { crate::sinks::azure_logs_ingestion::config::AzureAuthentication::SpecificAzureCredential { azure_credential_kind, user_assigned_managed_identity_id: _, + client_assertion_tenant_id: _, + client_assertion_client_id: _, } => { assert_eq!(azure_credential_kind, "managedidentity"); } diff --git a/website/cue/reference/components/sinks/generated/azure_logs_ingestion.cue b/website/cue/reference/components/sinks/generated/azure_logs_ingestion.cue index e8ac51c090ba3..90de0a6676c57 100644 --- a/website/cue/reference/components/sinks/generated/azure_logs_ingestion.cue +++ b/website/cue/reference/components/sinks/generated/azure_logs_ingestion.cue @@ -50,7 +50,7 @@ generated: components: sinks: azure_logs_ingestion: configuration: { azure_credential_kind: { description: "The kind of Azure credential to use." required: true - type: string: examples: ["azurecli", "managedidentity", "workloadidentity"] + type: string: examples: ["azurecli", "managedidentity", "managedidentityclientassertion", "workloadidentity"] } azure_tenant_id: { description: "The [Azure Tenant ID][azure_tenant_id]." @@ -60,6 +60,21 @@ generated: components: sinks: azure_logs_ingestion: configuration: { examples: ["00000000-0000-0000-0000-000000000000"] } } + client_assertion_client_id: { + description: "The target Client ID to use. Only applicable when `azure_credential_kind` is `managedidentityclientassertion`." + required: false + type: string: examples: ["00000000-0000-0000-0000-000000000000"] + } + client_assertion_tenant_id: { + description: "The target Tenant ID to use. Only applicable when `azure_credential_kind` is `managedidentityclientassertion`." + required: false + type: string: examples: ["00000000-0000-0000-0000-000000000000"] + } + user_assigned_managed_identity_id: { + description: "The User Assigned Managed Identity (Client ID) to use. Only applicable when `azure_credential_kind` is `managedidentity` or `managedidentityclientassertion`." + required: false + type: string: examples: ["00000000-0000-0000-0000-000000000000"] + } } } batch: { From 58b291dff41ad409a9fe2dcdf3083909d7e3cb40 Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Sun, 14 Dec 2025 17:45:44 +0000 Subject: [PATCH 34/40] update docs, spelling --- .github/actions/spelling/expect.txt | 3 +++ .../reference/components/sinks/azure_logs_ingestion.cue | 8 +++++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/.github/actions/spelling/expect.txt b/.github/actions/spelling/expect.txt index 93327ee306768..1daec0b4838ba 100644 --- a/.github/actions/spelling/expect.txt +++ b/.github/actions/spelling/expect.txt @@ -331,6 +331,8 @@ majorly makecache Makefiles mallocs +managedidentity +managedidentityclientassertion markdownify marketo maxbin @@ -653,6 +655,7 @@ wkkmmawf wmem woooooow woothee +workloadidentity workstreams writeback wtcache diff --git a/website/cue/reference/components/sinks/azure_logs_ingestion.cue b/website/cue/reference/components/sinks/azure_logs_ingestion.cue index 7e53b0e72d9d4..f3f34deef59ba 100644 --- a/website/cue/reference/components/sinks/azure_logs_ingestion.cue +++ b/website/cue/reference/components/sinks/azure_logs_ingestion.cue @@ -6,9 +6,11 @@ components: sinks: azure_logs_ingestion: { description: """ This sink uses the Azure Monitor Logs Ingestion API to send log events to a Log Analytics Workspace. - The `azure_identity` crate is used for authentication, which supports the standard Azure authentication types - (Workload Identity, Managed Identity, Azure CLI, Service Principal with Certificate or Secret, etc.) through - environment variables. + The `azure_identity` crate is used for authentication, which supports standard Azure authentication types + (Service Principal, Managed Identity, Workload Identity, Azure CLI, etc.). However, because this crate + implements `DefaultAzureCredential` differently to other language SDKs, the authentication type must be + explicitly configured: either using `azure_tenant_id`, `azure_client_id`, `azure_client_secret`, or selecting + one of the `azure_credential_kind` options. """ classes: { From dd5be48f0e8d075bfc02307dc26949f0fabb7851 Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Tue, 16 Dec 2025 00:20:46 +0000 Subject: [PATCH 35/40] add deprecation note to the changelog Signed-off-by: Jed Laundry --- changelog.d/22912_add_azure_logs_ingestion.feature.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/changelog.d/22912_add_azure_logs_ingestion.feature.md b/changelog.d/22912_add_azure_logs_ingestion.feature.md index 0cc46d5e48097..38cc6a8c169e4 100644 --- a/changelog.d/22912_add_azure_logs_ingestion.feature.md +++ b/changelog.d/22912_add_azure_logs_ingestion.feature.md @@ -1,3 +1,5 @@ Add support for the Azure Monitor Logs Ingestion API through a new `azure_logs_ingestion` sink. +The `azure_monitor_logs` sink is now deprecated, and current users will need to migrate to `azure_logs_ingestion` before Microsoft end support for the old Data Collector API (currently scheduled for September 2026). + authors: jlaundry From ba6671c6e4362195f00ddfce9f0eec60578973bd Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Tue, 16 Dec 2025 19:03:39 +0000 Subject: [PATCH 36/40] clippy changes Signed-off-by: Jed Laundry --- src/sinks/azure_logs_ingestion/config.rs | 15 ++++++++------- src/sinks/azure_logs_ingestion/tests.rs | 6 +----- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/src/sinks/azure_logs_ingestion/config.rs b/src/sinks/azure_logs_ingestion/config.rs index 7b742c497ee38..d4f6216379ae2 100644 --- a/src/sinks/azure_logs_ingestion/config.rs +++ b/src/sinks/azure_logs_ingestion/config.rs @@ -220,19 +220,19 @@ impl AzureAuthentication { azure_client_id, azure_client_secret, } => { - if azure_tenant_id == "" { + if azure_tenant_id.is_empty() { return Err(Error::with_message(ErrorKind::Credential, || { - format!("`auth.azure_tenant_id` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.") + "`auth.azure_tenant_id` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.".to_string() })) } - if azure_client_id == "" { + if azure_client_id.is_empty() { return Err(Error::with_message(ErrorKind::Credential, || { - format!("`auth.azure_client_id` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.") + "`auth.azure_client_id` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.".to_string() })) } - if azure_client_secret.inner() == "" { + if azure_client_secret.inner().is_empty() { return Err(Error::with_message(ErrorKind::Credential, || { - format!("`auth.azure_client_secret` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.") + "`auth.azure_client_secret` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.".to_string() })) } let secret: String = azure_client_secret.inner().into(); @@ -265,7 +265,7 @@ impl AzureAuthentication { azure_credential_kinds::MANAGED_IDENTITY_CLIENT_ASSERTION => { if client_assertion_tenant_id.is_none() || client_assertion_client_id.is_none() { return Err(Error::with_message(ErrorKind::Credential, || { - format!("`auth.client_assertion_tenant_id` and `auth.client_assertion_client_id` must be set when using `auth.azure_credential_kind` of `managedidentityclientassertion`") + "`auth.client_assertion_tenant_id` and `auth.client_assertion_client_id` must be set when using `auth.azure_credential_kind` of `managedidentityclientassertion`".to_string() })) } @@ -301,6 +301,7 @@ impl AzureAuthentication { } impl AzureLogsIngestionConfig { + #[allow(clippy::too_many_arguments)] pub(super) async fn build_inner( &self, cx: SinkContext, diff --git a/src/sinks/azure_logs_ingestion/tests.rs b/src/sinks/azure_logs_ingestion/tests.rs index cd11e2ddd77c3..f73a06a590e7c 100644 --- a/src/sinks/azure_logs_ingestion/tests.rs +++ b/src/sinks/azure_logs_ingestion/tests.rs @@ -184,8 +184,6 @@ async fn correct_request() { }) .await; - println!("Mock token authority running at {}", mock_token_authority.to_string()); - let mut credential_options = TokenCredentialOptions::default(); //credential_options.set_authority_host("http://127.0.0.1:9001".into()); credential_options.set_authority_host(mock_token_authority.to_string()); @@ -195,13 +193,11 @@ async fn correct_request() { "mock-client-id".into(), "mock-client-secret".into(), Some(ClientSecretCredentialOptions { - credential_options: credential_options, + credential_options, }), ) .expect("failed to create ClientSecretCredential"); - println!("Created ClientSecretCredential"); - let config: AzureLogsIngestionConfig = toml::from_str( r#" endpoint = "http://localhost:9001" From a518e1546972e60bf27d881e25b2d1066fcc8abd Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Tue, 6 Jan 2026 17:26:32 +0000 Subject: [PATCH 37/40] check-fmt changes Signed-off-by: Jed Laundry --- src/sinks/azure_logs_ingestion/config.rs | 66 +++++++------ src/sinks/azure_logs_ingestion/service.rs | 7 +- src/sinks/azure_logs_ingestion/sink.rs | 2 +- src/sinks/azure_logs_ingestion/tests.rs | 113 +++++++++++++--------- 4 files changed, 105 insertions(+), 83 deletions(-) diff --git a/src/sinks/azure_logs_ingestion/config.rs b/src/sinks/azure_logs_ingestion/config.rs index d4f6216379ae2..a867bf9b67472 100644 --- a/src/sinks/azure_logs_ingestion/config.rs +++ b/src/sinks/azure_logs_ingestion/config.rs @@ -1,26 +1,21 @@ use std::sync::Arc; use azure_core::credentials::TokenCredential; -use azure_core::{ - error::ErrorKind, - Error, -}; +use azure_core::{Error, error::ErrorKind}; use azure_identity::{ - AzureCliCredential, ClientAssertion, ClientAssertionCredential, ClientSecretCredential, ManagedIdentityCredential, ManagedIdentityCredentialOptions, UserAssignedId, WorkloadIdentityCredential -}; -use vector_lib::{ - schema, - configurable::configurable_component, - sensitive_string::SensitiveString, + AzureCliCredential, ClientAssertion, ClientAssertionCredential, ClientSecretCredential, + ManagedIdentityCredential, ManagedIdentityCredentialOptions, UserAssignedId, + WorkloadIdentityCredential, }; +use vector_lib::{configurable::configurable_component, schema, sensitive_string::SensitiveString}; use vrl::value::Kind; use crate::{ - http::{get_http_scheme_from_uri, HttpClient}, + http::{HttpClient, get_http_scheme_from_uri}, sinks::{ prelude::*, - util::{http::HttpStatusRetryLogic, RealtimeSizeBasedDefaultBatchSettings, UriSerde}, + util::{RealtimeSizeBasedDefaultBatchSettings, UriSerde, http::HttpStatusRetryLogic}, }, }; @@ -186,7 +181,7 @@ pub enum AzureAuthentication { #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))] #[serde(default, skip_serializing_if = "Option::is_none")] client_assertion_client_id: Option, - } + }, } #[derive(Debug)] @@ -211,9 +206,7 @@ impl ClientAssertion for ManagedIdentityClientAssertion { impl AzureAuthentication { /// Returns the provider for the credentials based on the authentication mechanism chosen. - pub async fn credential( - &self, - ) -> azure_core::Result> { + pub async fn credential(&self) -> azure_core::Result> { match self { Self::ClientSecretCredential { azure_tenant_id, @@ -223,17 +216,17 @@ impl AzureAuthentication { if azure_tenant_id.is_empty() { return Err(Error::with_message(ErrorKind::Credential, || { "`auth.azure_tenant_id` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.".to_string() - })) + })); } if azure_client_id.is_empty() { return Err(Error::with_message(ErrorKind::Credential, || { "`auth.azure_client_id` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.".to_string() - })) + })); } if azure_client_secret.inner().is_empty() { return Err(Error::with_message(ErrorKind::Credential, || { "`auth.azure_client_secret` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.".to_string() - })) + })); } let secret: String = azure_client_secret.inner().into(); let credential = ClientSecretCredential::new( @@ -251,29 +244,40 @@ impl AzureAuthentication { client_assertion_tenant_id, client_assertion_client_id, } => { - let credential: Arc = match azure_credential_kind.replace(' ', "").to_lowercase().as_str() { + let credential: Arc = match azure_credential_kind + .replace(' ', "") + .to_lowercase() + .as_str() + { #[cfg(not(target_arch = "wasm32"))] azure_credential_kinds::AZURE_CLI => AzureCliCredential::new(None)?, azure_credential_kinds::MANAGED_IDENTITY => { let mut options = ManagedIdentityCredentialOptions::default(); if user_assigned_managed_identity_id.is_some() { - options.user_assigned_id = Some(UserAssignedId::ClientId(user_assigned_managed_identity_id.clone().unwrap())); + options.user_assigned_id = Some(UserAssignedId::ClientId( + user_assigned_managed_identity_id.clone().unwrap(), + )); } - + ManagedIdentityCredential::new(Some(options))? } azure_credential_kinds::MANAGED_IDENTITY_CLIENT_ASSERTION => { - if client_assertion_tenant_id.is_none() || client_assertion_client_id.is_none() { + if client_assertion_tenant_id.is_none() + || client_assertion_client_id.is_none() + { return Err(Error::with_message(ErrorKind::Credential, || { "`auth.client_assertion_tenant_id` and `auth.client_assertion_client_id` must be set when using `auth.azure_credential_kind` of `managedidentityclientassertion`".to_string() - })) + })); } let mut options = ManagedIdentityCredentialOptions::default(); if user_assigned_managed_identity_id.is_some() { - options.user_assigned_id = Some(UserAssignedId::ClientId(user_assigned_managed_identity_id.clone().unwrap())); + options.user_assigned_id = Some(UserAssignedId::ClientId( + user_assigned_managed_identity_id.clone().unwrap(), + )); } - let msi: Arc = ManagedIdentityCredential::new(Some(options))?; + let msi: Arc = + ManagedIdentityCredential::new(Some(options))?; let assertion = ManagedIdentityClientAssertion { credential: msi, // Future: make this configurable for sovereign clouds? (no way to test...) @@ -287,11 +291,15 @@ impl AzureAuthentication { None, )? } - azure_credential_kinds::WORKLOAD_IDENTITY => WorkloadIdentityCredential::new(None)?, + azure_credential_kinds::WORKLOAD_IDENTITY => { + WorkloadIdentityCredential::new(None)? + } _ => { return Err(Error::with_message(ErrorKind::Credential, || { - format!("`auth.azure_credential_kind` `{azure_credential_kind}` is unknown/unsupported") - })) + format!( + "`auth.azure_credential_kind` `{azure_credential_kind}` is unknown/unsupported" + ) + })); } }; Ok(credential) diff --git a/src/sinks/azure_logs_ingestion/service.rs b/src/sinks/azure_logs_ingestion/service.rs index 883dcaf357243..85990aa7c93f4 100644 --- a/src/sinks/azure_logs_ingestion/service.rs +++ b/src/sinks/azure_logs_ingestion/service.rs @@ -2,15 +2,12 @@ use std::sync::Arc; use std::sync::LazyLock; use std::task::{Context, Poll}; -use azure_core::credentials::{ - AccessToken, - TokenCredential, -}; +use azure_core::credentials::{AccessToken, TokenCredential}; use bytes::Bytes; use http::{ - header::{self, HeaderMap}, HeaderValue, Request, StatusCode, Uri, + header::{self, HeaderMap}, }; use hyper::Body; use tracing::Instrument; diff --git a/src/sinks/azure_logs_ingestion/sink.rs b/src/sinks/azure_logs_ingestion/sink.rs index 6168ef51683a7..5493e4821f433 100644 --- a/src/sinks/azure_logs_ingestion/sink.rs +++ b/src/sinks/azure_logs_ingestion/sink.rs @@ -1,7 +1,7 @@ use std::{fmt::Debug, io}; use bytes::Bytes; -use vector_lib::codecs::{encoding::Framer, CharacterDelimitedEncoder, JsonSerializerConfig}; +use vector_lib::codecs::{CharacterDelimitedEncoder, JsonSerializerConfig, encoding::Framer}; use vector_lib::lookup::PathPrefix; use crate::sinks::prelude::*; diff --git a/src/sinks/azure_logs_ingestion/tests.rs b/src/sinks/azure_logs_ingestion/tests.rs index f73a06a590e7c..e1a6b13d1a694 100644 --- a/src/sinks/azure_logs_ingestion/tests.rs +++ b/src/sinks/azure_logs_ingestion/tests.rs @@ -1,16 +1,14 @@ -use std::time::Duration; use futures::stream; use http::Response; use hyper::body; +use std::time::Duration; use tokio::time::timeout; use vector_lib::config::log_schema; use azure_core::credentials::{AccessToken, TokenCredential}; use azure_core::date::OffsetDateTime; use azure_identity::{ - ClientSecretCredential, - ClientSecretCredentialOptions, - TokenCredentialOptions, + ClientSecretCredential, ClientSecretCredentialOptions, TokenCredentialOptions, }; use super::config::AzureLogsIngestionConfig; @@ -19,15 +17,11 @@ use crate::{ event::LogEvent, sinks::prelude::*, test_util::{ - components::{ - run_and_assert_sink_compliance, - SINK_TAGS, - }, + components::{SINK_TAGS, run_and_assert_sink_compliance}, http::spawn_blackhole_http_server, }, }; - #[test] fn generate_config() { crate::test_util::test_generate_config::(); @@ -40,11 +34,18 @@ async fn basic_config_error_with_no_auth() { endpoint = "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com" dcr_immutable_id = "dcr-00000000000000000000000000000000" stream_name = "Custom-UnitTest" - "#) - .expect("Config parsing failed"); - - assert_eq!(config.endpoint, "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com"); - assert_eq!(config.dcr_immutable_id, "dcr-00000000000000000000000000000000"); + "#, + ) + .expect("Config parsing failed"); + + assert_eq!( + config.endpoint, + "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com" + ); + assert_eq!( + config.dcr_immutable_id, + "dcr-00000000000000000000000000000000" + ); assert_eq!(config.stream_name, "Custom-UnitTest"); assert_eq!(config.token_scope, "https://monitor.azure.com/.default"); assert_eq!(config.timestamp_field, "TimeGenerated"); @@ -76,7 +77,6 @@ async fn basic_config_error_with_no_auth() { ); } } - } #[test] @@ -86,16 +86,23 @@ fn basic_config_with_client_credentials() { endpoint = "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com" dcr_immutable_id = "dcr-00000000000000000000000000000000" stream_name = "Custom-UnitTest" - + [auth] azure_tenant_id = "00000000-0000-0000-0000-000000000000" azure_client_id = "mock-client-id" azure_client_secret = "mock-client-secret" - "#) - .expect("Config parsing failed"); - - assert_eq!(config.endpoint, "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com"); - assert_eq!(config.dcr_immutable_id, "dcr-00000000000000000000000000000000"); + "#, + ) + .expect("Config parsing failed"); + + assert_eq!( + config.endpoint, + "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com" + ); + assert_eq!( + config.dcr_immutable_id, + "dcr-00000000000000000000000000000000" + ); assert_eq!(config.stream_name, "Custom-UnitTest"); assert_eq!(config.token_scope, "https://monitor.azure.com/.default"); assert_eq!(config.timestamp_field, "TimeGenerated"); @@ -122,14 +129,21 @@ fn basic_config_with_managed_identity() { endpoint = "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com" dcr_immutable_id = "dcr-00000000000000000000000000000000" stream_name = "Custom-UnitTest" - + [auth] azure_credential_kind = "managedidentity" - "#) - .expect("Config parsing failed"); - - assert_eq!(config.endpoint, "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com"); - assert_eq!(config.dcr_immutable_id, "dcr-00000000000000000000000000000000"); + "#, + ) + .expect("Config parsing failed"); + + assert_eq!( + config.endpoint, + "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com" + ); + assert_eq!( + config.dcr_immutable_id, + "dcr-00000000000000000000000000000000" + ); assert_eq!(config.stream_name, "Custom-UnitTest"); assert_eq!(config.token_scope, "https://monitor.azure.com/.default"); assert_eq!(config.timestamp_field, "TimeGenerated"); @@ -163,7 +177,6 @@ fn insert_timestamp_kv(log: &mut LogEvent) -> (String, String) { #[tokio::test] async fn correct_request() { - // Other tests can use `create_mock_credential`, we're going to run this end-to-end test with our own mock OAuth endpoint as well let (authority_tx, mut _authority_rx) = tokio::sync::mpsc::channel(1); let mock_token_authority = spawn_blackhole_http_server(move |request| { @@ -174,12 +187,13 @@ async fn correct_request() { "access_token": "mock-access-token", "token_type": "Bearer", "expires_in": 3600 - }).to_string(); + }) + .to_string(); Ok(Response::builder() - .header("Content-Type", "application/json") - .body(body.into()) - .unwrap()) + .header("Content-Type", "application/json") + .body(body.into()) + .unwrap()) } }) .await; @@ -192,9 +206,7 @@ async fn correct_request() { "00000000-0000-0000-0000-000000000000", "mock-client-id".into(), "mock-client-secret".into(), - Some(ClientSecretCredentialOptions { - credential_options, - }), + Some(ClientSecretCredentialOptions { credential_options }), ) .expect("failed to create ClientSecretCredential"); @@ -208,8 +220,9 @@ async fn correct_request() { azure_tenant_id = "00000000-0000-0000-0000-000000000000" azure_client_id = "mock-client-id" azure_client_secret = "mock-client-secret" - "#) - .unwrap(); + "#, + ) + .unwrap(); let mut log1 = [("message", "hello")].iter().copied().collect::(); let (_timestamp_key1, timestamp_value1) = insert_timestamp_kv(&mut log1); @@ -274,7 +287,6 @@ async fn correct_request() { &parts.uri.path_and_query().unwrap().to_string(), "/dataCollectionRules/dcr-00000000000000000000000000000000/streams/Custom-UnitTest?api-version=2023-01-01" ); - } fn create_mock_credential() -> impl TokenCredential { @@ -300,7 +312,6 @@ fn create_mock_credential() -> impl TokenCredential { #[tokio::test] async fn mock_healthcheck_with_403_response() { - let config: AzureLogsIngestionConfig = toml::from_str( r#" endpoint = "http://localhost:9001" @@ -311,8 +322,9 @@ async fn mock_healthcheck_with_403_response() { azure_tenant_id = "00000000-0000-0000-0000-000000000000" azure_client_id = "mock-client-id" azure_client_secret = "mock-client-secret" - "#) - .unwrap(); + "#, + ) + .unwrap(); let mut log1 = [("message", "hello")].iter().copied().collect::(); let (_timestamp_key1, _timestamp_value1) = insert_timestamp_kv(&mut log1); @@ -324,13 +336,14 @@ async fn mock_healthcheck_with_403_response() { endpoint_tx.send(request).await.unwrap(); let body = serde_json::json!({ "error": "bla", - }).to_string(); + }) + .to_string(); Ok(Response::builder() - .status(403) - .header("Content-Type", "application/json") - .body(body.into()) - .unwrap()) + .status(403) + .header("Content-Type", "application/json") + .body(body.into()) + .unwrap()) } }) .await; @@ -353,5 +366,9 @@ async fn mock_healthcheck_with_403_response() { let hc_err = healthcheck.await.unwrap_err(); let err_str = hc_err.to_string(); - assert!(err_str.contains("Forbidden"), "Healthcheck error does not contain 'Forbidden': {}", err_str); -} \ No newline at end of file + assert!( + err_str.contains("Forbidden"), + "Healthcheck error does not contain 'Forbidden': {}", + err_str + ); +} From 25bafe4ae3f0c5a8539f44dc2dc22bb110132f4a Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Tue, 6 Jan 2026 18:41:30 +0000 Subject: [PATCH 38/40] check-docs changes Signed-off-by: Jed Laundry --- website/cue/reference/components/sinks/azure_logs_ingestion.cue | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/website/cue/reference/components/sinks/azure_logs_ingestion.cue b/website/cue/reference/components/sinks/azure_logs_ingestion.cue index f3f34deef59ba..0b3fec1187eb0 100644 --- a/website/cue/reference/components/sinks/azure_logs_ingestion.cue +++ b/website/cue/reference/components/sinks/azure_logs_ingestion.cue @@ -71,7 +71,7 @@ components: sinks: azure_logs_ingestion: { notices: [] } - configuration: base.components.sinks.azure_logs_ingestion.configuration + configuration: generated.components.sinks.azure_logs_ingestion.configuration input: { logs: true From 94dfa6b93fea71d5c3070af12014a69cc44ac467 Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Tue, 6 Jan 2026 18:42:02 +0000 Subject: [PATCH 39/40] check-clippy changes Signed-off-by: Jed Laundry --- src/sinks/azure_logs_ingestion/service.rs | 1 + src/sinks/azure_logs_ingestion/tests.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/src/sinks/azure_logs_ingestion/service.rs b/src/sinks/azure_logs_ingestion/service.rs index 85990aa7c93f4..154646189b3e9 100644 --- a/src/sinks/azure_logs_ingestion/service.rs +++ b/src/sinks/azure_logs_ingestion/service.rs @@ -157,6 +157,7 @@ impl AzureLogsIngestionService { } if res.status() == StatusCode::BAD_REQUEST { + #[allow(deprecated)] let body_bytes: Bytes = hyper::body::to_bytes(res.into_body()).await.unwrap(); let body_string: String = String::from_utf8(body_bytes.to_vec()).unwrap(); let err_string: String = format!("Azure returned 400 Bad Request: {body_string}"); diff --git a/src/sinks/azure_logs_ingestion/tests.rs b/src/sinks/azure_logs_ingestion/tests.rs index e1a6b13d1a694..b157f7f121f79 100644 --- a/src/sinks/azure_logs_ingestion/tests.rs +++ b/src/sinks/azure_logs_ingestion/tests.rs @@ -265,6 +265,7 @@ async fn correct_request() { let (parts, body) = request.into_parts(); assert_eq!(&parts.method.to_string(), "POST"); + #[allow(deprecated)] let body = body::to_bytes(body).await.unwrap(); let body_json: serde_json::Value = serde_json::from_slice(&body[..]).unwrap(); let expected_json = serde_json::json!([ From 1dd1bca89d77c0b5e1578c2cbfb10d72180aeca3 Mon Sep 17 00:00:00 2001 From: Jed Laundry Date: Wed, 7 Jan 2026 18:29:34 +0000 Subject: [PATCH 40/40] replace deprecated hyper::body::to_bytes Signed-off-by: Jed Laundry --- src/sinks/azure_logs_ingestion/service.rs | 3 +- src/sinks/azure_logs_ingestion/tests.rs | 79 +++++++++++++++++++++-- 2 files changed, 76 insertions(+), 6 deletions(-) diff --git a/src/sinks/azure_logs_ingestion/service.rs b/src/sinks/azure_logs_ingestion/service.rs index 154646189b3e9..261683c383a0b 100644 --- a/src/sinks/azure_logs_ingestion/service.rs +++ b/src/sinks/azure_logs_ingestion/service.rs @@ -157,8 +157,7 @@ impl AzureLogsIngestionService { } if res.status() == StatusCode::BAD_REQUEST { - #[allow(deprecated)] - let body_bytes: Bytes = hyper::body::to_bytes(res.into_body()).await.unwrap(); + let body_bytes: Bytes = http_body::Body::collect(res.into_body()).await?.to_bytes(); let body_string: String = String::from_utf8(body_bytes.to_vec()).unwrap(); let err_string: String = format!("Azure returned 400 Bad Request: {body_string}"); return Err(err_string.into()); diff --git a/src/sinks/azure_logs_ingestion/tests.rs b/src/sinks/azure_logs_ingestion/tests.rs index b157f7f121f79..3e8a1335c3fb1 100644 --- a/src/sinks/azure_logs_ingestion/tests.rs +++ b/src/sinks/azure_logs_ingestion/tests.rs @@ -1,6 +1,6 @@ +use bytes::Bytes; use futures::stream; use http::Response; -use hyper::body; use std::time::Duration; use tokio::time::timeout; use vector_lib::config::log_schema; @@ -265,9 +265,11 @@ async fn correct_request() { let (parts, body) = request.into_parts(); assert_eq!(&parts.method.to_string(), "POST"); - #[allow(deprecated)] - let body = body::to_bytes(body).await.unwrap(); - let body_json: serde_json::Value = serde_json::from_slice(&body[..]).unwrap(); + let body_bytes: Bytes = http_body::Body::collect(body) + .await + .expect("failed to collect body") + .to_bytes(); + let body_json: serde_json::Value = serde_json::from_slice(&body_bytes[..]).unwrap(); let expected_json = serde_json::json!([ { "TimeGenerated": timestamp_value1, @@ -311,6 +313,75 @@ fn create_mock_credential() -> impl TokenCredential { MockCredential } +#[tokio::test] +async fn mock_healthcheck_with_400_response() { + let config: AzureLogsIngestionConfig = toml::from_str( + r#" + endpoint = "http://localhost:9001" + dcr_immutable_id = "dcr-00000000000000000000000000000000" + stream_name = "Custom-UnitTest" + + [auth] + azure_tenant_id = "00000000-0000-0000-0000-000000000000" + azure_client_id = "mock-client-id" + azure_client_secret = "mock-client-secret" + "#, + ) + .unwrap(); + + let mut log1 = [("message", "hello")].iter().copied().collect::(); + let (_timestamp_key1, _timestamp_value1) = insert_timestamp_kv(&mut log1); + + let (endpoint_tx, _endpoint_rx) = tokio::sync::mpsc::channel(1); + let mock_endpoint = spawn_blackhole_http_server(move |request| { + let endpoint_tx = endpoint_tx.clone(); + async move { + endpoint_tx.send(request).await.unwrap(); + let body = serde_json::json!({ + "error": "Mock400ErrorResponse", + }) + .to_string(); + + Ok(Response::builder() + .status(400) + .header("Content-Type", "application/json") + .body(body.into()) + .unwrap()) + } + }) + .await; + + let context = SinkContext::default(); + let credential = std::sync::Arc::new(create_mock_credential()); + + let (_sink, healthcheck) = config + .build_inner( + context, + mock_endpoint.into(), + config.dcr_immutable_id.clone(), + config.stream_name.clone(), + credential, + config.token_scope.clone(), + config.timestamp_field.clone(), + ) + .await + .unwrap(); + + let hc_err = healthcheck.await.unwrap_err(); + let err_str = hc_err.to_string(); + // Both generic 400 "Bad Request", and our mock error message should be present + assert!( + err_str.contains("Bad Request"), + "Healthcheck error does not contain 'Bad Request': {}", + err_str + ); + assert!( + err_str.contains("Mock400ErrorResponse"), + "Healthcheck error does not contain 'Mock400ErrorResponse': {}", + err_str + ); +} + #[tokio::test] async fn mock_healthcheck_with_403_response() { let config: AzureLogsIngestionConfig = toml::from_str(