diff --git a/.github/actions/spelling/expect.txt b/.github/actions/spelling/expect.txt index 1d68a6978570b..a01a110782567 100644 --- a/.github/actions/spelling/expect.txt +++ b/.github/actions/spelling/expect.txt @@ -16,6 +16,7 @@ appname aqf architecting archs +ASIM assertverify atag aton @@ -129,6 +130,7 @@ datasources datid datname DBserver +dcr ddagent ddev ddmetrics @@ -329,6 +331,8 @@ majorly makecache Makefiles mallocs +managedidentity +managedidentityclientassertion markdownify marketo maxbin @@ -652,6 +656,7 @@ wkkmmawf wmem woooooow woothee +workloadidentity workstreams writeback wtcache diff --git a/.github/workflows/semantic.yml b/.github/workflows/semantic.yml index 88fd9e4cb6451..85b9d33189d08 100644 --- a/.github/workflows/semantic.yml +++ b/.github/workflows/semantic.yml @@ -206,6 +206,7 @@ jobs: aws_sqs sink axiom sink azure_blob sink + azure_logs_ingestion sink azure_monitor_logs sink blackhole sink clickhouse sink diff --git a/Cargo.toml b/Cargo.toml index 102ae70602f00..df4eeef5dead6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -787,6 +787,7 @@ sinks-logs = [ "sinks-aws_sqs", "sinks-axiom", "sinks-azure_blob", + "sinks-azure_logs_ingestion", "sinks-azure_monitor_logs", "sinks-blackhole", "sinks-chronicle", @@ -854,6 +855,7 @@ sinks-aws_sqs = ["aws-core", "dep:aws-sdk-sqs"] sinks-aws_sns = ["aws-core", "dep:aws-sdk-sns"] sinks-axiom = ["sinks-http"] sinks-azure_blob = ["dep:azure_core", "dep:azure_identity", "dep:azure_storage", "dep:azure_storage_blobs"] +sinks-azure_logs_ingestion = ["dep:azure_core", "dep:azure_identity"] sinks-azure_monitor_logs = [] sinks-blackhole = [] sinks-chronicle = [] diff --git a/changelog.d/22912_add_azure_logs_ingestion.feature.md b/changelog.d/22912_add_azure_logs_ingestion.feature.md new file mode 100644 index 0000000000000..38cc6a8c169e4 --- /dev/null +++ b/changelog.d/22912_add_azure_logs_ingestion.feature.md @@ -0,0 +1,5 @@ +Add support for the Azure Monitor Logs Ingestion API through a new `azure_logs_ingestion` sink. + +The `azure_monitor_logs` sink is now deprecated, and current users will need to migrate to `azure_logs_ingestion` before Microsoft end support for the old Data Collector API (currently scheduled for September 2026). + +authors: jlaundry diff --git a/src/sinks/azure_logs_ingestion/config.rs b/src/sinks/azure_logs_ingestion/config.rs new file mode 100644 index 0000000000000..a867bf9b67472 --- /dev/null +++ b/src/sinks/azure_logs_ingestion/config.rs @@ -0,0 +1,396 @@ +use std::sync::Arc; + +use azure_core::credentials::TokenCredential; +use azure_core::{Error, error::ErrorKind}; + +use azure_identity::{ + AzureCliCredential, ClientAssertion, ClientAssertionCredential, ClientSecretCredential, + ManagedIdentityCredential, ManagedIdentityCredentialOptions, UserAssignedId, + WorkloadIdentityCredential, +}; +use vector_lib::{configurable::configurable_component, schema, sensitive_string::SensitiveString}; +use vrl::value::Kind; + +use crate::{ + http::{HttpClient, get_http_scheme_from_uri}, + sinks::{ + prelude::*, + util::{RealtimeSizeBasedDefaultBatchSettings, UriSerde, http::HttpStatusRetryLogic}, + }, +}; + +use super::{ + service::{AzureLogsIngestionResponse, AzureLogsIngestionService}, + sink::AzureLogsIngestionSink, +}; + +/// Max number of bytes in request body +const MAX_BATCH_SIZE: usize = 30 * 1024 * 1024; + +pub(super) fn default_scope() -> String { + "https://monitor.azure.com/.default".into() +} + +pub(super) fn default_timestamp_field() -> String { + "TimeGenerated".into() +} + +/// Configuration for the `azure_logs_ingestion` sink. +#[configurable_component(sink( + "azure_logs_ingestion", + "Publish log events to the Azure Monitor Logs Ingestion API." +))] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub struct AzureLogsIngestionConfig { + /// The [Data collection endpoint URI][endpoint] associated with the Log Analytics workspace. + /// + /// [endpoint]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview + #[configurable(metadata( + docs::examples = "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com" + ))] + pub endpoint: String, + + /// The [Data collection rule immutable ID][dcr_immutable_id] for the Data collection endpoint. + /// + /// [dcr_immutable_id]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview + #[configurable(metadata(docs::examples = "dcr-000a00a000a00000a000000aa000a0aa"))] + pub dcr_immutable_id: String, + + /// The [Stream name][stream_name] for the Data collection rule. + /// + /// [stream_name]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview + #[configurable(metadata(docs::examples = "Custom-MyTable"))] + pub stream_name: String, + + #[configurable(derived)] + #[serde(default)] + pub auth: AzureAuthentication, + + /// [Token scope][token_scope] for dedicated Azure regions. + /// + /// [token_scope]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview + #[configurable(metadata(docs::examples = "https://monitor.azure.us/.default"))] + #[configurable(metadata(docs::examples = "https://monitor.azure.cn/.default"))] + #[serde(default = "default_scope")] + pub(super) token_scope: String, + + /// The destination field (column) for the timestamp. + /// + /// The setting of `log_schema.timestamp_key`, usually `timestamp`, is used as the source. + /// Most schemas use `TimeGenerated`, but some use `Timestamp` (legacy) or `EventStartTime` (ASIM) [std_columns]. + /// + /// [std_columns]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/log-standard-columns#timegenerated + #[configurable(metadata(docs::examples = "EventStartTime"))] + #[configurable(metadata(docs::examples = "Timestamp"))] + #[serde(default = "default_timestamp_field")] + pub timestamp_field: String, + + #[configurable(derived)] + #[serde(default, skip_serializing_if = "crate::serde::is_default")] + pub encoding: Transformer, + + #[configurable(derived)] + #[serde(default)] + pub batch: BatchConfig, + + #[configurable(derived)] + #[serde(default)] + pub request: TowerRequestConfig, + + #[configurable(derived)] + pub tls: Option, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::is_default" + )] + pub acknowledgements: AcknowledgementsConfig, +} + +impl Default for AzureLogsIngestionConfig { + fn default() -> Self { + Self { + endpoint: Default::default(), + dcr_immutable_id: Default::default(), + stream_name: Default::default(), + auth: Default::default(), + token_scope: default_scope(), + timestamp_field: default_timestamp_field(), + encoding: Default::default(), + batch: Default::default(), + request: Default::default(), + tls: None, + acknowledgements: Default::default(), + } + } +} + +mod azure_credential_kinds { + #[cfg(not(target_arch = "wasm32"))] + pub const AZURE_CLI: &str = "azurecli"; + pub const MANAGED_IDENTITY: &str = "managedidentity"; + pub const MANAGED_IDENTITY_CLIENT_ASSERTION: &str = "managedidentityclientassertion"; + pub const WORKLOAD_IDENTITY: &str = "workloadidentity"; +} + +/// Configuration of the authentication strategy for interacting with Azure services. +#[configurable_component] +#[derive(Clone, Debug, Derivative, Eq, PartialEq)] +#[derivative(Default)] +#[serde(deny_unknown_fields, untagged)] +pub enum AzureAuthentication { + /// Use client credentials + #[derivative(Default)] + ClientSecretCredential { + /// The [Azure Tenant ID][azure_tenant_id]. + #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))] + azure_tenant_id: String, + + /// The [Azure Client ID][azure_client_id]. + #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))] + azure_client_id: String, + + /// The [Azure Client Secret][azure_client_secret]. + #[configurable(metadata(docs::examples = "00-00~000000-0000000~0000000000000000000"))] + azure_client_secret: SensitiveString, + }, + + /// Use credentials from environment variables + SpecificAzureCredential { + /// The kind of Azure credential to use. + #[configurable(metadata(docs::examples = "azurecli"))] + #[configurable(metadata(docs::examples = "managedidentity"))] + #[configurable(metadata(docs::examples = "managedidentityclientassertion"))] + #[configurable(metadata(docs::examples = "workloadidentity"))] + azure_credential_kind: String, + + /// The User Assigned Managed Identity (Client ID) to use. Only applicable when `azure_credential_kind` is `managedidentity` or `managedidentityclientassertion`. + #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))] + #[serde(default, skip_serializing_if = "Option::is_none")] + user_assigned_managed_identity_id: Option, + + /// The target Tenant ID to use. Only applicable when `azure_credential_kind` is `managedidentityclientassertion`. + #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))] + #[serde(default, skip_serializing_if = "Option::is_none")] + client_assertion_tenant_id: Option, + + /// The target Client ID to use. Only applicable when `azure_credential_kind` is `managedidentityclientassertion`. + #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))] + #[serde(default, skip_serializing_if = "Option::is_none")] + client_assertion_client_id: Option, + }, +} + +#[derive(Debug)] +struct ManagedIdentityClientAssertion { + credential: Arc, + scope: String, +} + +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +impl ClientAssertion for ManagedIdentityClientAssertion { + async fn secret(&self) -> azure_core::Result { + Ok(self + .credential + .get_token(&[&self.scope], None) + .await? + .token + .secret() + .to_string()) + } +} + +impl AzureAuthentication { + /// Returns the provider for the credentials based on the authentication mechanism chosen. + pub async fn credential(&self) -> azure_core::Result> { + match self { + Self::ClientSecretCredential { + azure_tenant_id, + azure_client_id, + azure_client_secret, + } => { + if azure_tenant_id.is_empty() { + return Err(Error::with_message(ErrorKind::Credential, || { + "`auth.azure_tenant_id` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.".to_string() + })); + } + if azure_client_id.is_empty() { + return Err(Error::with_message(ErrorKind::Credential, || { + "`auth.azure_client_id` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.".to_string() + })); + } + if azure_client_secret.inner().is_empty() { + return Err(Error::with_message(ErrorKind::Credential, || { + "`auth.azure_client_secret` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.".to_string() + })); + } + let secret: String = azure_client_secret.inner().into(); + let credential = ClientSecretCredential::new( + &azure_tenant_id.clone(), + azure_client_id.clone(), + secret.into(), + None, + )?; + Ok(credential) + } + + Self::SpecificAzureCredential { + azure_credential_kind, + user_assigned_managed_identity_id, + client_assertion_tenant_id, + client_assertion_client_id, + } => { + let credential: Arc = match azure_credential_kind + .replace(' ', "") + .to_lowercase() + .as_str() + { + #[cfg(not(target_arch = "wasm32"))] + azure_credential_kinds::AZURE_CLI => AzureCliCredential::new(None)?, + azure_credential_kinds::MANAGED_IDENTITY => { + let mut options = ManagedIdentityCredentialOptions::default(); + if user_assigned_managed_identity_id.is_some() { + options.user_assigned_id = Some(UserAssignedId::ClientId( + user_assigned_managed_identity_id.clone().unwrap(), + )); + } + + ManagedIdentityCredential::new(Some(options))? + } + azure_credential_kinds::MANAGED_IDENTITY_CLIENT_ASSERTION => { + if client_assertion_tenant_id.is_none() + || client_assertion_client_id.is_none() + { + return Err(Error::with_message(ErrorKind::Credential, || { + "`auth.client_assertion_tenant_id` and `auth.client_assertion_client_id` must be set when using `auth.azure_credential_kind` of `managedidentityclientassertion`".to_string() + })); + } + + let mut options = ManagedIdentityCredentialOptions::default(); + if user_assigned_managed_identity_id.is_some() { + options.user_assigned_id = Some(UserAssignedId::ClientId( + user_assigned_managed_identity_id.clone().unwrap(), + )); + } + let msi: Arc = + ManagedIdentityCredential::new(Some(options))?; + let assertion = ManagedIdentityClientAssertion { + credential: msi, + // Future: make this configurable for sovereign clouds? (no way to test...) + scope: "api://AzureADTokenExchange/.default".to_string(), + }; + + ClientAssertionCredential::new( + client_assertion_tenant_id.clone().unwrap(), + client_assertion_client_id.clone().unwrap(), + assertion, + None, + )? + } + azure_credential_kinds::WORKLOAD_IDENTITY => { + WorkloadIdentityCredential::new(None)? + } + _ => { + return Err(Error::with_message(ErrorKind::Credential, || { + format!( + "`auth.azure_credential_kind` `{azure_credential_kind}` is unknown/unsupported" + ) + })); + } + }; + Ok(credential) + } + } + } +} + +impl AzureLogsIngestionConfig { + #[allow(clippy::too_many_arguments)] + pub(super) async fn build_inner( + &self, + cx: SinkContext, + endpoint: UriSerde, + dcr_immutable_id: String, + stream_name: String, + credential: Arc, + token_scope: String, + timestamp_field: String, + ) -> crate::Result<(VectorSink, Healthcheck)> { + let endpoint = endpoint.with_default_parts().uri; + let protocol = get_http_scheme_from_uri(&endpoint).to_string(); + + let batch_settings = self + .batch + .validate()? + .limit_max_bytes(MAX_BATCH_SIZE)? + .into_batcher_settings()?; + + let tls_settings = TlsSettings::from_options(self.tls.as_ref())?; + let client = HttpClient::new(Some(tls_settings), &cx.proxy)?; + + let service = AzureLogsIngestionService::new( + client, + endpoint, + dcr_immutable_id, + stream_name, + credential, + token_scope, + )?; + let healthcheck = service.healthcheck(); + + let retry_logic = + HttpStatusRetryLogic::new(|res: &AzureLogsIngestionResponse| res.http_status); + let request_settings = self.request.into_settings(); + let service = ServiceBuilder::new() + .settings(request_settings, retry_logic) + .service(service); + + let sink = AzureLogsIngestionSink::new( + batch_settings, + self.encoding.clone(), + service, + timestamp_field, + protocol, + ); + + Ok((VectorSink::from_event_streamsink(sink), healthcheck)) + } +} + +impl_generate_config_from_default!(AzureLogsIngestionConfig); + +#[async_trait::async_trait] +#[typetag::serde(name = "azure_logs_ingestion")] +impl SinkConfig for AzureLogsIngestionConfig { + async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let endpoint: UriSerde = self.endpoint.parse()?; + + let credential: Arc = self.auth.credential().await?; + + self.build_inner( + cx, + endpoint, + self.dcr_immutable_id.clone(), + self.stream_name.clone(), + credential, + self.token_scope.clone(), + self.timestamp_field.clone(), + ) + .await + } + + fn input(&self) -> Input { + let requirements = + schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp()); + + Input::log().with_schema_requirement(requirements) + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} diff --git a/src/sinks/azure_logs_ingestion/mod.rs b/src/sinks/azure_logs_ingestion/mod.rs new file mode 100644 index 0000000000000..292bde918d227 --- /dev/null +++ b/src/sinks/azure_logs_ingestion/mod.rs @@ -0,0 +1,13 @@ +//! The Azure Logs Ingestion [`vector_lib::sink::VectorSink`] +//! +//! This module contains the [`vector_lib::sink::VectorSink`] instance that is responsible for +//! taking a stream of [`vector_lib::event::Event`] instances and forwarding them to the Azure +//! Logs Ingestion API. + +mod config; +mod service; +mod sink; +#[cfg(test)] +mod tests; + +pub use config::AzureLogsIngestionConfig; diff --git a/src/sinks/azure_logs_ingestion/service.rs b/src/sinks/azure_logs_ingestion/service.rs new file mode 100644 index 0000000000000..261683c383a0b --- /dev/null +++ b/src/sinks/azure_logs_ingestion/service.rs @@ -0,0 +1,197 @@ +use std::sync::Arc; +use std::sync::LazyLock; +use std::task::{Context, Poll}; + +use azure_core::credentials::{AccessToken, TokenCredential}; + +use bytes::Bytes; +use http::{ + HeaderValue, Request, StatusCode, Uri, + header::{self, HeaderMap}, +}; +use hyper::Body; +use tracing::Instrument; + +use crate::{http::HttpClient, sinks::prelude::*}; + +/// Log Ingestion API version +const API_VERSION: &str = "2023-01-01"; + +/// JSON content type of logs +const CONTENT_TYPE: &str = "application/json"; + +static CONTENT_TYPE_VALUE: LazyLock = + LazyLock::new(|| HeaderValue::from_static(CONTENT_TYPE)); + +#[derive(Debug, Clone)] +pub struct AzureLogsIngestionRequest { + pub body: Bytes, + pub finalizers: EventFinalizers, + pub metadata: RequestMetadata, +} + +impl MetaDescriptive for AzureLogsIngestionRequest { + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata + } +} + +impl Finalizable for AzureLogsIngestionRequest { + fn take_finalizers(&mut self) -> EventFinalizers { + self.finalizers.take_finalizers() + } +} + +pub struct AzureLogsIngestionResponse { + pub http_status: StatusCode, + pub events_byte_size: GroupedCountByteSize, + pub raw_byte_size: usize, +} + +impl DriverResponse for AzureLogsIngestionResponse { + fn event_status(&self) -> EventStatus { + match self.http_status.is_success() { + true => EventStatus::Delivered, + false => EventStatus::Rejected, + } + } + + fn events_sent(&self) -> &GroupedCountByteSize { + &self.events_byte_size + } + + fn bytes_sent(&self) -> Option { + Some(self.raw_byte_size) + } +} + +/// `AzureLogsIngestionService` is a `Tower` service used to send logs to Azure. +#[derive(Debug, Clone)] +pub struct AzureLogsIngestionService { + client: HttpClient, + endpoint: Uri, + credential: Arc, + token_scope: String, + default_headers: HeaderMap, +} + +impl AzureLogsIngestionService { + /// Creates a new `AzureLogsIngestionService`. + pub fn new( + client: HttpClient, + endpoint: Uri, + dcr_immutable_id: String, + stream_name: String, + credential: Arc, + token_scope: String, + ) -> crate::Result { + let mut parts = endpoint.into_parts(); + parts.path_and_query = Some( + format!("/dataCollectionRules/{dcr_immutable_id}/streams/{stream_name}?api-version={API_VERSION}") + .parse() + .expect("path and query should never fail to parse"), + ); + let endpoint = Uri::from_parts(parts)?; + + let default_headers = { + let mut headers = HeaderMap::new(); + + headers.insert(header::CONTENT_TYPE, CONTENT_TYPE_VALUE.clone()); + headers + }; + + Ok(Self { + client, + endpoint, + credential, + token_scope, + default_headers, + }) + } + + async fn build_request(&self, body: Bytes) -> crate::Result> { + let mut request = Request::post(&self.endpoint).body(Body::from(body))?; + + // TODO: make this an option, for sovereign clouds + let access_token: AccessToken = self + .credential + .get_token(&[&self.token_scope], None) + .await?; + + let bearer = format!("Bearer {}", access_token.token.secret()); + + *request.headers_mut() = self.default_headers.clone(); + request.headers_mut().insert( + header::AUTHORIZATION, + HeaderValue::from_str(&bearer).unwrap(), + ); + + Ok(request) + } + + pub fn healthcheck(&self) -> Healthcheck { + let service = self.clone(); + let mut client = service.client.clone(); + Box::pin(async move { + let request = service.build_request(Bytes::from("[]")).await?; + let res = client.call(request).in_current_span().await?; + + if res.status().is_server_error() { + return Err("Azure returned a server error".into()); + } + + if res.status() == StatusCode::UNAUTHORIZED { + return Err("Azure returned 401 Unauthorised. Check that the token_scope matches the sovereign cloud endpoint.".into()); + } + + if res.status() == StatusCode::FORBIDDEN { + return Err("Azure returned 403 Forbidden. Verify that the credential has the Monitoring Metrics Publisher role on the Data Collection Rule.".into()); + } + + if res.status() == StatusCode::NOT_FOUND { + return Err("Azure returned 404 Not Found. Either the URL provided is incorrect, or the request is too large.".into()); + } + + if res.status() == StatusCode::BAD_REQUEST { + let body_bytes: Bytes = http_body::Body::collect(res.into_body()).await?.to_bytes(); + let body_string: String = String::from_utf8(body_bytes.to_vec()).unwrap(); + let err_string: String = format!("Azure returned 400 Bad Request: {body_string}"); + return Err(err_string.into()); + } + + Ok(()) + }) + } +} + +impl Service for AzureLogsIngestionService { + type Response = AzureLogsIngestionResponse; + type Error = crate::Error; + type Future = BoxFuture<'static, Result>; + + // Emission of Error internal event is handled upstream by the caller. + fn poll_ready(&mut self, _cx: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + // Emission of Error internal event is handled upstream by the caller. + fn call(&mut self, request: AzureLogsIngestionRequest) -> Self::Future { + let service = self.clone(); + let mut client = service.client.clone(); + Box::pin(async move { + let http_request = service.build_request(request.body).await?; + let response = client.call(http_request).in_current_span().await?; + Ok(AzureLogsIngestionResponse { + http_status: response.status(), + raw_byte_size: request.metadata.request_encoded_size(), + events_byte_size: request + .metadata + .into_events_estimated_json_encoded_byte_size(), + }) + }) + } +} diff --git a/src/sinks/azure_logs_ingestion/sink.rs b/src/sinks/azure_logs_ingestion/sink.rs new file mode 100644 index 0000000000000..5493e4821f433 --- /dev/null +++ b/src/sinks/azure_logs_ingestion/sink.rs @@ -0,0 +1,173 @@ +use std::{fmt::Debug, io}; + +use bytes::Bytes; +use vector_lib::codecs::{CharacterDelimitedEncoder, JsonSerializerConfig, encoding::Framer}; +use vector_lib::lookup::PathPrefix; + +use crate::sinks::prelude::*; + +use super::service::AzureLogsIngestionRequest; + +pub struct AzureLogsIngestionSink { + batch_settings: BatcherSettings, + encoding: JsonEncoding, + service: S, + protocol: String, +} + +impl AzureLogsIngestionSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: Debug + Into + Send, +{ + pub fn new( + batch_settings: BatcherSettings, + transformer: Transformer, + service: S, + timestamp_field: String, + protocol: String, + ) -> Self { + Self { + batch_settings, + encoding: JsonEncoding::new(transformer, timestamp_field), + service, + protocol, + } + } + + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + input + .batched(self.batch_settings.as_byte_size_config()) + .request_builder( + default_request_builder_concurrency_limit(), + AzureLogsIngestionRequestBuilder { + encoding: self.encoding, + }, + ) + .filter_map(|request| async { + match request { + Err(error) => { + emit!(SinkRequestBuildError { error }); + None + } + Ok(req) => Some(req), + } + }) + .into_driver(self.service) + .protocol(self.protocol.clone()) + .run() + .await + } +} + +#[async_trait::async_trait] +impl StreamSink for AzureLogsIngestionSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: Debug + Into + Send, +{ + async fn run( + self: Box, + input: futures_util::stream::BoxStream<'_, Event>, + ) -> Result<(), ()> { + self.run_inner(input).await + } +} + +/// Customized encoding specific to the Azure Logs Ingestion sink. +#[derive(Clone, Debug)] +pub(super) struct JsonEncoding { + timestamp_field: String, + encoder: (Transformer, Encoder), +} + +impl JsonEncoding { + pub fn new(transformer: Transformer, timestamp_field: String) -> Self { + Self { + timestamp_field, + encoder: ( + transformer, + Encoder::::new( + CharacterDelimitedEncoder::new(b',').into(), + JsonSerializerConfig::default().build().into(), + ), + ), + } + } +} + +impl crate::sinks::util::encoding::Encoder> for JsonEncoding { + fn encode_input( + &self, + mut input: Vec, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)> { + for event in input.iter_mut() { + let log = event.as_mut_log(); + + // `.remove_timestamp()` will return the `timestamp` value regardless of location in Event or + // Metadata, the following `insert()` ensures it's encoded in the request. + let timestamp = if let Some(Value::Timestamp(ts)) = log.remove_timestamp() { + ts + } else { + chrono::Utc::now() + }; + + log.insert( + (PathPrefix::Event, self.timestamp_field.as_str()), + serde_json::Value::String( + timestamp.to_rfc3339_opts(chrono::SecondsFormat::Micros, true), + ), + ); + } + + self.encoder.encode_input(input, writer) + } +} + +struct AzureLogsIngestionRequestBuilder { + encoding: JsonEncoding, +} + +impl RequestBuilder> for AzureLogsIngestionRequestBuilder { + type Metadata = EventFinalizers; + type Events = Vec; + type Encoder = JsonEncoding; + type Payload = Bytes; + type Request = AzureLogsIngestionRequest; + type Error = std::io::Error; + + fn compression(&self) -> Compression { + Compression::None + } + + fn encoder(&self) -> &Self::Encoder { + &self.encoding + } + + fn split_input( + &self, + mut events: Vec, + ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { + let finalizers = events.take_finalizers(); + let builder = RequestMetadataBuilder::from_events(&events); + (finalizers, builder, events) + } + + fn build_request( + &self, + finalizers: Self::Metadata, + request_metadata: RequestMetadata, + payload: EncodeResult, + ) -> Self::Request { + AzureLogsIngestionRequest { + body: payload.into_payload(), + finalizers, + metadata: request_metadata, + } + } +} diff --git a/src/sinks/azure_logs_ingestion/tests.rs b/src/sinks/azure_logs_ingestion/tests.rs new file mode 100644 index 0000000000000..3e8a1335c3fb1 --- /dev/null +++ b/src/sinks/azure_logs_ingestion/tests.rs @@ -0,0 +1,446 @@ +use bytes::Bytes; +use futures::stream; +use http::Response; +use std::time::Duration; +use tokio::time::timeout; +use vector_lib::config::log_schema; + +use azure_core::credentials::{AccessToken, TokenCredential}; +use azure_core::date::OffsetDateTime; +use azure_identity::{ + ClientSecretCredential, ClientSecretCredentialOptions, TokenCredentialOptions, +}; + +use super::config::AzureLogsIngestionConfig; + +use crate::{ + event::LogEvent, + sinks::prelude::*, + test_util::{ + components::{SINK_TAGS, run_and_assert_sink_compliance}, + http::spawn_blackhole_http_server, + }, +}; + +#[test] +fn generate_config() { + crate::test_util::test_generate_config::(); +} + +#[tokio::test] +async fn basic_config_error_with_no_auth() { + let config: AzureLogsIngestionConfig = toml::from_str::( + r#" + endpoint = "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com" + dcr_immutable_id = "dcr-00000000000000000000000000000000" + stream_name = "Custom-UnitTest" + "#, + ) + .expect("Config parsing failed"); + + assert_eq!( + config.endpoint, + "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com" + ); + assert_eq!( + config.dcr_immutable_id, + "dcr-00000000000000000000000000000000" + ); + assert_eq!(config.stream_name, "Custom-UnitTest"); + assert_eq!(config.token_scope, "https://monitor.azure.com/.default"); + assert_eq!(config.timestamp_field, "TimeGenerated"); + + match &config.auth { + crate::sinks::azure_logs_ingestion::config::AzureAuthentication::ClientSecretCredential { + azure_tenant_id, + azure_client_id, + azure_client_secret, + } => { + assert_eq!(azure_tenant_id, ""); + assert_eq!(azure_client_id, ""); + let secret: String = azure_client_secret.inner().into(); + assert_eq!(secret, ""); + } + _ => panic!("Expected ClientSecretCredential variant"), + } + + let cx = SinkContext::default(); + let sink = config.build(cx).await; + match sink { + Ok(_) => panic!("Config build should have errored due to missing auth info"), + Err(e) => { + let err_str = e.to_string(); + assert!( + err_str.contains("`auth.azure_tenant_id` is blank"), + "Config build did not complain about azure_tenant_id being blank: {}", + err_str + ); + } + } +} + +#[test] +fn basic_config_with_client_credentials() { + let config: AzureLogsIngestionConfig = toml::from_str::( + r#" + endpoint = "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com" + dcr_immutable_id = "dcr-00000000000000000000000000000000" + stream_name = "Custom-UnitTest" + + [auth] + azure_tenant_id = "00000000-0000-0000-0000-000000000000" + azure_client_id = "mock-client-id" + azure_client_secret = "mock-client-secret" + "#, + ) + .expect("Config parsing failed"); + + assert_eq!( + config.endpoint, + "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com" + ); + assert_eq!( + config.dcr_immutable_id, + "dcr-00000000000000000000000000000000" + ); + assert_eq!(config.stream_name, "Custom-UnitTest"); + assert_eq!(config.token_scope, "https://monitor.azure.com/.default"); + assert_eq!(config.timestamp_field, "TimeGenerated"); + + match &config.auth { + crate::sinks::azure_logs_ingestion::config::AzureAuthentication::ClientSecretCredential { + azure_tenant_id, + azure_client_id, + azure_client_secret, + } => { + assert_eq!(azure_tenant_id, "00000000-0000-0000-0000-000000000000"); + assert_eq!(azure_client_id, "mock-client-id"); + let secret: String = azure_client_secret.inner().into(); + assert_eq!(secret, "mock-client-secret"); + } + _ => panic!("Expected ClientSecretCredential variant"), + } +} + +#[test] +fn basic_config_with_managed_identity() { + let config: AzureLogsIngestionConfig = toml::from_str::( + r#" + endpoint = "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com" + dcr_immutable_id = "dcr-00000000000000000000000000000000" + stream_name = "Custom-UnitTest" + + [auth] + azure_credential_kind = "managedidentity" + "#, + ) + .expect("Config parsing failed"); + + assert_eq!( + config.endpoint, + "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com" + ); + assert_eq!( + config.dcr_immutable_id, + "dcr-00000000000000000000000000000000" + ); + assert_eq!(config.stream_name, "Custom-UnitTest"); + assert_eq!(config.token_scope, "https://monitor.azure.com/.default"); + assert_eq!(config.timestamp_field, "TimeGenerated"); + + match &config.auth { + crate::sinks::azure_logs_ingestion::config::AzureAuthentication::SpecificAzureCredential { + azure_credential_kind, + user_assigned_managed_identity_id: _, + client_assertion_tenant_id: _, + client_assertion_client_id: _, + } => { + assert_eq!(azure_credential_kind, "managedidentity"); + } + _ => panic!("Expected SpecificAzureCredential variant"), + } +} + +// TODO test config with ManagedIdentity (will need to mock env vars...) + +fn insert_timestamp_kv(log: &mut LogEvent) -> (String, String) { + let now = chrono::Utc::now(); + + let timestamp_value = now.to_rfc3339_opts(chrono::SecondsFormat::Micros, true); + log.insert(log_schema().timestamp_key_target_path().unwrap(), now); + + ( + log_schema().timestamp_key().unwrap().to_string(), + timestamp_value, + ) +} + +#[tokio::test] +async fn correct_request() { + // Other tests can use `create_mock_credential`, we're going to run this end-to-end test with our own mock OAuth endpoint as well + let (authority_tx, mut _authority_rx) = tokio::sync::mpsc::channel(1); + let mock_token_authority = spawn_blackhole_http_server(move |request| { + let authority_tx = authority_tx.clone(); + async move { + authority_tx.send(request).await.unwrap(); + let body = serde_json::json!({ + "access_token": "mock-access-token", + "token_type": "Bearer", + "expires_in": 3600 + }) + .to_string(); + + Ok(Response::builder() + .header("Content-Type", "application/json") + .body(body.into()) + .unwrap()) + } + }) + .await; + + let mut credential_options = TokenCredentialOptions::default(); + //credential_options.set_authority_host("http://127.0.0.1:9001".into()); + credential_options.set_authority_host(mock_token_authority.to_string()); + + let credential: std::sync::Arc = ClientSecretCredential::new( + "00000000-0000-0000-0000-000000000000", + "mock-client-id".into(), + "mock-client-secret".into(), + Some(ClientSecretCredentialOptions { credential_options }), + ) + .expect("failed to create ClientSecretCredential"); + + let config: AzureLogsIngestionConfig = toml::from_str( + r#" + endpoint = "http://localhost:9001" + dcr_immutable_id = "dcr-00000000000000000000000000000000" + stream_name = "Custom-UnitTest" + + [auth] + azure_tenant_id = "00000000-0000-0000-0000-000000000000" + azure_client_id = "mock-client-id" + azure_client_secret = "mock-client-secret" + "#, + ) + .unwrap(); + + let mut log1 = [("message", "hello")].iter().copied().collect::(); + let (_timestamp_key1, timestamp_value1) = insert_timestamp_kv(&mut log1); + + let mut log2 = [("message", "world")].iter().copied().collect::(); + let (_timestamp_key2, timestamp_value2) = insert_timestamp_kv(&mut log2); + + let (endpoint_tx, mut endpoint_rx) = tokio::sync::mpsc::channel(1); + let mock_endpoint = spawn_blackhole_http_server(move |request| { + let endpoint_tx = endpoint_tx.clone(); + async move { + endpoint_tx.send(request).await.unwrap(); + Ok(Response::new(hyper::Body::empty())) + } + }) + .await; + + let context = SinkContext::default(); + + let (sink, _healthcheck) = config + .build_inner( + context, + mock_endpoint.into(), + config.dcr_immutable_id.clone(), + config.stream_name.clone(), + credential, + config.token_scope.clone(), + config.timestamp_field.clone(), + ) + .await + .unwrap(); + + run_and_assert_sink_compliance(sink, stream::iter(vec![log1, log2]), &SINK_TAGS).await; + + let request = timeout(Duration::from_millis(500), endpoint_rx.recv()) + .await + .unwrap() + .unwrap(); + + let (parts, body) = request.into_parts(); + assert_eq!(&parts.method.to_string(), "POST"); + + let body_bytes: Bytes = http_body::Body::collect(body) + .await + .expect("failed to collect body") + .to_bytes(); + let body_json: serde_json::Value = serde_json::from_slice(&body_bytes[..]).unwrap(); + let expected_json = serde_json::json!([ + { + "TimeGenerated": timestamp_value1, + "message": "hello" + }, + { + "TimeGenerated": timestamp_value2, + "message": "world" + } + ]); + assert_eq!(body_json, expected_json); + + let headers = parts.headers; + let authorization = headers.get("Authorization").unwrap(); + assert_eq!(authorization.to_str().unwrap(), "Bearer mock-access-token"); + + assert_eq!( + &parts.uri.path_and_query().unwrap().to_string(), + "/dataCollectionRules/dcr-00000000000000000000000000000000/streams/Custom-UnitTest?api-version=2023-01-01" + ); +} + +fn create_mock_credential() -> impl TokenCredential { + #[derive(Debug)] + struct MockCredential; + + #[async_trait::async_trait] + impl TokenCredential for MockCredential { + async fn get_token( + &self, + _scopes: &[&str], + _options: Option, + ) -> azure_core::Result { + Ok(AccessToken::new( + "mock-access-token".to_string(), + OffsetDateTime::now_utc() + Duration::from_hours(1), + )) + } + } + + MockCredential +} + +#[tokio::test] +async fn mock_healthcheck_with_400_response() { + let config: AzureLogsIngestionConfig = toml::from_str( + r#" + endpoint = "http://localhost:9001" + dcr_immutable_id = "dcr-00000000000000000000000000000000" + stream_name = "Custom-UnitTest" + + [auth] + azure_tenant_id = "00000000-0000-0000-0000-000000000000" + azure_client_id = "mock-client-id" + azure_client_secret = "mock-client-secret" + "#, + ) + .unwrap(); + + let mut log1 = [("message", "hello")].iter().copied().collect::(); + let (_timestamp_key1, _timestamp_value1) = insert_timestamp_kv(&mut log1); + + let (endpoint_tx, _endpoint_rx) = tokio::sync::mpsc::channel(1); + let mock_endpoint = spawn_blackhole_http_server(move |request| { + let endpoint_tx = endpoint_tx.clone(); + async move { + endpoint_tx.send(request).await.unwrap(); + let body = serde_json::json!({ + "error": "Mock400ErrorResponse", + }) + .to_string(); + + Ok(Response::builder() + .status(400) + .header("Content-Type", "application/json") + .body(body.into()) + .unwrap()) + } + }) + .await; + + let context = SinkContext::default(); + let credential = std::sync::Arc::new(create_mock_credential()); + + let (_sink, healthcheck) = config + .build_inner( + context, + mock_endpoint.into(), + config.dcr_immutable_id.clone(), + config.stream_name.clone(), + credential, + config.token_scope.clone(), + config.timestamp_field.clone(), + ) + .await + .unwrap(); + + let hc_err = healthcheck.await.unwrap_err(); + let err_str = hc_err.to_string(); + // Both generic 400 "Bad Request", and our mock error message should be present + assert!( + err_str.contains("Bad Request"), + "Healthcheck error does not contain 'Bad Request': {}", + err_str + ); + assert!( + err_str.contains("Mock400ErrorResponse"), + "Healthcheck error does not contain 'Mock400ErrorResponse': {}", + err_str + ); +} + +#[tokio::test] +async fn mock_healthcheck_with_403_response() { + let config: AzureLogsIngestionConfig = toml::from_str( + r#" + endpoint = "http://localhost:9001" + dcr_immutable_id = "dcr-00000000000000000000000000000000" + stream_name = "Custom-UnitTest" + + [auth] + azure_tenant_id = "00000000-0000-0000-0000-000000000000" + azure_client_id = "mock-client-id" + azure_client_secret = "mock-client-secret" + "#, + ) + .unwrap(); + + let mut log1 = [("message", "hello")].iter().copied().collect::(); + let (_timestamp_key1, _timestamp_value1) = insert_timestamp_kv(&mut log1); + + let (endpoint_tx, _endpoint_rx) = tokio::sync::mpsc::channel(1); + let mock_endpoint = spawn_blackhole_http_server(move |request| { + let endpoint_tx = endpoint_tx.clone(); + async move { + endpoint_tx.send(request).await.unwrap(); + let body = serde_json::json!({ + "error": "bla", + }) + .to_string(); + + Ok(Response::builder() + .status(403) + .header("Content-Type", "application/json") + .body(body.into()) + .unwrap()) + } + }) + .await; + + let context = SinkContext::default(); + let credential = std::sync::Arc::new(create_mock_credential()); + + let (_sink, healthcheck) = config + .build_inner( + context, + mock_endpoint.into(), + config.dcr_immutable_id.clone(), + config.stream_name.clone(), + credential, + config.token_scope.clone(), + config.timestamp_field.clone(), + ) + .await + .unwrap(); + + let hc_err = healthcheck.await.unwrap_err(); + let err_str = hc_err.to_string(); + assert!( + err_str.contains("Forbidden"), + "Healthcheck error does not contain 'Forbidden': {}", + err_str + ); +} diff --git a/src/sinks/azure_monitor_logs/config.rs b/src/sinks/azure_monitor_logs/config.rs index 0b923894f1b3e..dc60d179a2ff5 100644 --- a/src/sinks/azure_monitor_logs/config.rs +++ b/src/sinks/azure_monitor_logs/config.rs @@ -30,7 +30,7 @@ pub(super) fn default_host() -> String { /// Configuration for the `azure_monitor_logs` sink. #[configurable_component(sink( "azure_monitor_logs", - "Publish log events to the Azure Monitor Logs service." + "Publish log events to the Azure Monitor Data Collector API." ))] #[derive(Clone, Debug)] #[serde(deny_unknown_fields)] diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index 75dcd683f46a5..3a9c3c049a0fa 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -28,6 +28,8 @@ pub mod axiom; pub mod azure_blob; #[cfg(feature = "sinks-azure_blob")] pub mod azure_common; +#[cfg(feature = "sinks-azure_logs_ingestion")] +pub mod azure_logs_ingestion; #[cfg(feature = "sinks-azure_monitor_logs")] pub mod azure_monitor_logs; #[cfg(feature = "sinks-blackhole")] diff --git a/website/cue/reference/components/sinks/azure_logs_ingestion.cue b/website/cue/reference/components/sinks/azure_logs_ingestion.cue new file mode 100644 index 0000000000000..0b3fec1187eb0 --- /dev/null +++ b/website/cue/reference/components/sinks/azure_logs_ingestion.cue @@ -0,0 +1,81 @@ +package metadata + +components: sinks: azure_logs_ingestion: { + title: "Azure Logs Ingestion" + + description: """ + This sink uses the Azure Monitor Logs Ingestion API to send log events to a Log Analytics Workspace. + + The `azure_identity` crate is used for authentication, which supports standard Azure authentication types + (Service Principal, Managed Identity, Workload Identity, Azure CLI, etc.). However, because this crate + implements `DefaultAzureCredential` differently to other language SDKs, the authentication type must be + explicitly configured: either using `azure_tenant_id`, `azure_client_id`, `azure_client_secret`, or selecting + one of the `azure_credential_kind` options. + """ + + classes: { + commonly_used: false + delivery: "at_least_once" + development: "beta" + egress_method: "batch" + service_providers: ["Azure"] + stateful: false + } + + features: { + auto_generated: true + acknowledgements: true + healthcheck: enabled: true + send: { + batch: { + enabled: true + common: false + max_bytes: 10_000_000 + timeout_secs: 1.0 + } + compression: enabled: false + encoding: { + enabled: true + codec: enabled: false + } + proxy: enabled: true + request: enabled: false + tls: { + enabled: true + can_verify_certificate: true + can_verify_hostname: true + enabled_default: true + enabled_by_scheme: true + } + to: { + service: services.azure_logs_ingestion + + interface: { + socket: { + api: { + title: "Azure Monitor Logs Ingestion API" + url: urls.azure_logs_ingestion_endpoints + } + direction: "outgoing" + protocols: ["http"] + ssl: "required" + } + } + } + } + } + + support: { + requirements: [] + warnings: [] + notices: [] + } + + configuration: generated.components.sinks.azure_logs_ingestion.configuration + + input: { + logs: true + metrics: null + traces: false + } +} diff --git a/website/cue/reference/components/sinks/azure_monitor_logs.cue b/website/cue/reference/components/sinks/azure_monitor_logs.cue index 1d421b0f63ada..e764495d9304b 100644 --- a/website/cue/reference/components/sinks/azure_monitor_logs.cue +++ b/website/cue/reference/components/sinks/azure_monitor_logs.cue @@ -6,7 +6,7 @@ components: sinks: azure_monitor_logs: { classes: { commonly_used: false delivery: "at_least_once" - development: "beta" + development: "deprecated" egress_method: "batch" service_providers: ["Azure"] stateful: false @@ -57,7 +57,13 @@ components: sinks: azure_monitor_logs: { support: { requirements: [] - warnings: [] + warnings: [ + """ + The upstream Data Collector API [has been deprecated](urls.azure_monitor_data_collector_deprecation), + and will stop working in September 2026. Consider migrating to the `azure_logs_ingestion` sink, which + requires creating Data Collection Endpoint and Data Collection Rule resources in Azure. + """, + ] notices: [] } diff --git a/website/cue/reference/components/sinks/generated/azure_logs_ingestion.cue b/website/cue/reference/components/sinks/generated/azure_logs_ingestion.cue new file mode 100644 index 0000000000000..90de0a6676c57 --- /dev/null +++ b/website/cue/reference/components/sinks/generated/azure_logs_ingestion.cue @@ -0,0 +1,474 @@ +package metadata + +generated: components: sinks: azure_logs_ingestion: configuration: { + acknowledgements: { + description: """ + Controls how acknowledgements are handled for this sink. + + See [End-to-end Acknowledgements][e2e_acks] for more information on how event acknowledgement is handled. + + [e2e_acks]: https://vector.dev/docs/architecture/end-to-end-acknowledgements/ + """ + required: false + type: object: options: enabled: { + description: """ + Controls whether or not end-to-end acknowledgements are enabled. + + When enabled for a sink, any source that supports end-to-end + acknowledgements that is connected to that sink waits for events + to be acknowledged by **all connected sinks** before acknowledging them at the source. + + Enabling or disabling acknowledgements at the sink level takes precedence over any global + [`acknowledgements`][global_acks] configuration. + + [global_acks]: https://vector.dev/docs/reference/configuration/global-options/#acknowledgements + """ + required: false + type: bool: {} + } + } + auth: { + description: "Configuration of the authentication strategy for interacting with Azure services." + required: false + type: object: options: { + azure_client_id: { + description: "The [Azure Client ID][azure_client_id]." + required: false + type: string: { + default: "" + examples: ["00000000-0000-0000-0000-000000000000"] + } + } + azure_client_secret: { + description: "The [Azure Client Secret][azure_client_secret]." + required: false + type: string: { + default: "" + examples: ["00-00~000000-0000000~0000000000000000000"] + } + } + azure_credential_kind: { + description: "The kind of Azure credential to use." + required: true + type: string: examples: ["azurecli", "managedidentity", "managedidentityclientassertion", "workloadidentity"] + } + azure_tenant_id: { + description: "The [Azure Tenant ID][azure_tenant_id]." + required: false + type: string: { + default: "" + examples: ["00000000-0000-0000-0000-000000000000"] + } + } + client_assertion_client_id: { + description: "The target Client ID to use. Only applicable when `azure_credential_kind` is `managedidentityclientassertion`." + required: false + type: string: examples: ["00000000-0000-0000-0000-000000000000"] + } + client_assertion_tenant_id: { + description: "The target Tenant ID to use. Only applicable when `azure_credential_kind` is `managedidentityclientassertion`." + required: false + type: string: examples: ["00000000-0000-0000-0000-000000000000"] + } + user_assigned_managed_identity_id: { + description: "The User Assigned Managed Identity (Client ID) to use. Only applicable when `azure_credential_kind` is `managedidentity` or `managedidentityclientassertion`." + required: false + type: string: examples: ["00000000-0000-0000-0000-000000000000"] + } + } + } + batch: { + description: "Event batching behavior." + required: false + type: object: options: { + max_bytes: { + description: """ + The maximum size of a batch that is processed by a sink. + + This is based on the uncompressed size of the batched events, before they are + serialized or compressed. + """ + required: false + type: uint: { + default: 10000000 + unit: "bytes" + } + } + max_events: { + description: "The maximum size of a batch before it is flushed." + required: false + type: uint: unit: "events" + } + timeout_secs: { + description: "The maximum age of a batch before it is flushed." + required: false + type: float: { + default: 1.0 + unit: "seconds" + } + } + } + } + dcr_immutable_id: { + description: """ + The [Data collection rule immutable ID][dcr_immutable_id] for the Data collection endpoint. + + [dcr_immutable_id]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview + """ + required: true + type: string: examples: ["dcr-000a00a000a00000a000000aa000a0aa"] + } + encoding: { + description: "Transformations to prepare an event for serialization." + required: false + type: object: options: { + except_fields: { + description: "List of fields that are excluded from the encoded event." + required: false + type: array: items: type: string: {} + } + only_fields: { + description: "List of fields that are included in the encoded event." + required: false + type: array: items: type: string: {} + } + timestamp_format: { + description: "Format used for timestamp fields." + required: false + type: string: enum: { + rfc3339: "Represent the timestamp as a RFC 3339 timestamp." + unix: "Represent the timestamp as a Unix timestamp." + unix_float: "Represent the timestamp as a Unix timestamp in floating point." + unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds." + unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds." + unix_us: "Represent the timestamp as a Unix timestamp in microseconds." + } + } + } + } + endpoint: { + description: """ + The [Data collection endpoint URI][endpoint] associated with the Log Analytics workspace. + + [endpoint]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview + """ + required: true + type: string: examples: ["https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com"] + } + request: { + description: """ + Middleware settings for outbound requests. + + Various settings can be configured, such as concurrency and rate limits, timeouts, and retry behavior. + + Note that the retry backoff policy follows the Fibonacci sequence. + """ + required: false + type: object: options: { + adaptive_concurrency: { + description: """ + Configuration of adaptive concurrency parameters. + + These parameters typically do not require changes from the default, and incorrect values can lead to meta-stable or + unstable performance and sink behavior. Proceed with caution. + """ + required: false + type: object: options: { + decrease_ratio: { + description: """ + The fraction of the current value to set the new concurrency limit when decreasing the limit. + + Valid values are greater than `0` and less than `1`. Smaller values cause the algorithm to scale back rapidly + when latency increases. + + **Note**: The new limit is rounded down after applying this ratio. + """ + required: false + type: float: default: 0.9 + } + ewma_alpha: { + description: """ + The weighting of new measurements compared to older measurements. + + Valid values are greater than `0` and less than `1`. + + ARC uses an exponentially weighted moving average (EWMA) of past RTT measurements as a reference to compare with + the current RTT. Smaller values cause this reference to adjust more slowly, which may be useful if a service has + unusually high response variability. + """ + required: false + type: float: default: 0.4 + } + initial_concurrency: { + description: """ + The initial concurrency limit to use. If not specified, the initial limit is 1 (no concurrency). + + Datadog recommends setting this value to your service's average limit if you're seeing that it takes a + long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the + `adaptive_concurrency_limit` metric. + """ + required: false + type: uint: default: 1 + } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit does not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } + rtt_deviation_scale: { + description: """ + Scale of RTT deviations which are not considered anomalous. + + Valid values are greater than or equal to `0`, and reasonable values range from `1.0` to `3.0`. + + When calculating the past RTT average, a secondary “deviation” value is also computed that indicates how variable + those values are. That deviation is used when comparing the past RTT average to the current measurements, so we + can ignore increases in RTT that are within an expected range. This factor is used to scale up the deviation to + an appropriate range. Larger values cause the algorithm to ignore larger increases in the RTT. + """ + required: false + type: float: default: 2.5 + } + } + } + concurrency: { + description: """ + Configuration for outbound request concurrency. + + This can be set either to one of the below enum values or to a positive integer, which denotes + a fixed concurrency limit. + """ + required: false + type: { + string: { + default: "adaptive" + enum: { + adaptive: """ + Concurrency is managed by Vector's [Adaptive Request Concurrency][arc] feature. + + [arc]: https://vector.dev/docs/architecture/arc/ + """ + none: """ + A fixed concurrency of 1. + + Only one request can be outstanding at any given time. + """ + } + } + uint: {} + } + } + rate_limit_duration_secs: { + description: "The time window used for the `rate_limit_num` option." + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + rate_limit_num: { + description: "The maximum number of requests allowed within the `rate_limit_duration_secs` time window." + required: false + type: uint: { + default: 9223372036854775807 + unit: "requests" + } + } + retry_attempts: { + description: "The maximum number of retries to make for failed requests." + required: false + type: uint: { + default: 9223372036854775807 + unit: "retries" + } + } + retry_initial_backoff_secs: { + description: """ + The amount of time to wait before attempting the first retry for a failed request. + + After the first retry has failed, the Fibonacci sequence is used to select future backoffs. + """ + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + retry_jitter_mode: { + description: "The jitter mode to use for retry backoff behavior." + required: false + type: string: { + default: "Full" + enum: { + Full: """ + Full jitter. + + The random delay is anywhere from 0 up to the maximum current delay calculated by the backoff + strategy. + + Incorporating full jitter into your backoff strategy can greatly reduce the likelihood + of creating accidental denial of service (DoS) conditions against your own systems when + many clients are recovering from a failure state. + """ + None: "No jitter." + } + } + } + retry_max_duration_secs: { + description: "The maximum amount of time to wait between retries." + required: false + type: uint: { + default: 30 + unit: "seconds" + } + } + timeout_secs: { + description: """ + The time a request can take before being aborted. + + Datadog highly recommends that you do not lower this value below the service's internal timeout, as this could + create orphaned requests, pile on retries, and result in duplicate data downstream. + """ + required: false + type: uint: { + default: 60 + unit: "seconds" + } + } + } + } + stream_name: { + description: """ + The [Stream name][stream_name] for the Data collection rule. + + [stream_name]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview + """ + required: true + type: string: examples: ["Custom-MyTable"] + } + timestamp_field: { + description: """ + The destination field (column) for the timestamp. + + The setting of `log_schema.timestamp_key`, usually `timestamp`, is used as the source. + Most schemas use `TimeGenerated`, but some use `Timestamp` (legacy) or `EventStartTime` (ASIM) [std_columns]. + + [std_columns]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/log-standard-columns#timegenerated + """ + required: false + type: string: { + default: "TimeGenerated" + examples: ["EventStartTime", "Timestamp"] + } + } + tls: { + description: "TLS configuration." + required: false + type: object: options: { + alpn_protocols: { + description: """ + Sets the list of supported ALPN protocols. + + Declare the supported ALPN protocols, which are used during negotiation with a peer. They are prioritized in the order + that they are defined. + """ + required: false + type: array: items: type: string: examples: ["h2"] + } + ca_file: { + description: """ + Absolute path to an additional CA certificate file. + + The certificate must be in the DER or PEM (X.509) format. Additionally, the certificate can be provided as an inline string in PEM format. + """ + required: false + type: string: examples: ["/path/to/certificate_authority.crt"] + } + crt_file: { + description: """ + Absolute path to a certificate file used to identify this server. + + The certificate must be in DER, PEM (X.509), or PKCS#12 format. Additionally, the certificate can be provided as + an inline string in PEM format. + + If this is set _and_ is not a PKCS#12 archive, `key_file` must also be set. + """ + required: false + type: string: examples: ["/path/to/host_certificate.crt"] + } + key_file: { + description: """ + Absolute path to a private key file used to identify this server. + + The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format. + """ + required: false + type: string: examples: ["/path/to/host_certificate.key"] + } + key_pass: { + description: """ + Passphrase used to unlock the encrypted key file. + + This has no effect unless `key_file` is set. + """ + required: false + type: string: examples: ["${KEY_PASS_ENV_VAR}", "PassWord1"] + } + server_name: { + description: """ + Server name to use when using Server Name Indication (SNI). + + Only relevant for outgoing connections. + """ + required: false + type: string: examples: ["www.example.com"] + } + verify_certificate: { + description: """ + Enables certificate verification. For components that create a server, this requires that the + client connections have a valid client certificate. For components that initiate requests, + this validates that the upstream has a valid certificate. + + If enabled, certificates must not be expired and must be issued by a trusted + issuer. This verification operates in a hierarchical manner, checking that the leaf certificate (the + certificate presented by the client/server) is not only valid, but that the issuer of that certificate is also valid, and + so on, until the verification process reaches a root certificate. + + Do NOT set this to `false` unless you understand the risks of not verifying the validity of certificates. + """ + required: false + type: bool: {} + } + verify_hostname: { + description: """ + Enables hostname verification. + + If enabled, the hostname used to connect to the remote host must be present in the TLS certificate presented by + the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension. + + Only relevant for outgoing connections. + + Do NOT set this to `false` unless you understand the risks of not verifying the remote hostname. + """ + required: false + type: bool: {} + } + } + } + token_scope: { + description: """ + [Token scope][token_scope] for dedicated Azure regions. + + [token_scope]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview + """ + required: false + type: string: { + default: "https://monitor.azure.com/.default" + examples: ["https://monitor.azure.us/.default", "https://monitor.azure.cn/.default"] + } + } +} diff --git a/website/cue/reference/services/azure_logs_ingestion.cue b/website/cue/reference/services/azure_logs_ingestion.cue new file mode 100644 index 0000000000000..45f76912ee66f --- /dev/null +++ b/website/cue/reference/services/azure_logs_ingestion.cue @@ -0,0 +1,10 @@ +package metadata + +services: azure_logs_ingestion: { + name: "Azure Logs Ingestion" + thing: "a \(name) account" + url: urls.azure_monitor + versions: null + + description: "[Azure Monitor](\(urls.azure_monitor)) is a service in Azure that provides performance and availability monitoring for applications and services in Azure, other cloud environments, or on-premises. Azure Monitor collects data from multiple sources into a common data platform (Log Analytics) where it can be analyzed for trends and anomalies." +} diff --git a/website/cue/reference/urls.cue b/website/cue/reference/urls.cue index 079d8378e2696..995d55ae4d977 100644 --- a/website/cue/reference/urls.cue +++ b/website/cue/reference/urls.cue @@ -95,6 +95,8 @@ urls: { azure_blob_endpoints: "https://docs.microsoft.com/en-us/rest/api/storageservices/blob-service-rest-api" azure_monitor: "https://azure.microsoft.com/en-us/services/monitor/" azure_monitor_logs_endpoints: "https://docs.microsoft.com/en-us/rest/api/monitor/" + azure_monitor_data_collector_deprecation: "https://learn.microsoft.com/previous-versions/azure/azure-monitor/logs/data-collector-api" + azure_logs_ingestion_endpoints: "https://learn.microsoft.com/azure/azure-monitor/logs/logs-ingestion-api-overview" base16: "\(wikipedia)/wiki/Hexadecimal" base64: "\(wikipedia)/wiki/Base64" base64_padding: "\(wikipedia)/wiki/Base64#Output_padding"