Skip to content

Commit 3ade2f1

Browse files
committed
Working azure data explorer sink
1 parent 7cd3395 commit 3ade2f1

File tree

12 files changed

+1581
-0
lines changed

12 files changed

+1581
-0
lines changed

Cargo.lock

Lines changed: 18 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,7 @@ aws-smithy-types = { version = "1.2.11", default-features = false, features = ["
296296

297297
# Azure
298298
azure_core = { version = "0.30", features = ["reqwest", "hmac_openssl"], optional = true }
299+
azure_identity = { version = "0.30", optional = true }
299300

300301
# Azure Storage
301302
azure_storage_blob = { version = "0.7", optional = true }
@@ -785,6 +786,7 @@ sinks-logs = [
785786
"sinks-aws_sqs",
786787
"sinks-axiom",
787788
"sinks-azure_blob",
789+
"sinks-azure_data_explorer",
788790
"sinks-azure_monitor_logs",
789791
"sinks-blackhole",
790792
"sinks-chronicle",
@@ -852,6 +854,7 @@ sinks-aws_sqs = ["aws-core", "dep:aws-sdk-sqs"]
852854
sinks-aws_sns = ["aws-core", "dep:aws-sdk-sns"]
853855
sinks-axiom = ["sinks-http"]
854856
sinks-azure_blob = ["dep:azure_core", "dep:azure_storage_blob"]
857+
sinks-azure_data_explorer = ["dep:azure_core", "dep:azure_identity", "dep:base64"]
855858
sinks-azure_monitor_logs = []
856859
sinks-blackhole = []
857860
sinks-chronicle = []
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
//! Azure Entra ID authentication for Azure Data Explorer.
2+
//!
3+
//! Uses [`azure_identity::ClientSecretCredential`] (from the official Azure SDK
4+
//! for Rust) for service-principal client-credentials authentication, rather
5+
//! than a hand-rolled OAuth2 flow.
6+
7+
use std::sync::Arc;
8+
9+
use azure_core::credentials::{Secret, TokenCredential};
10+
use azure_identity::ClientSecretCredential;
11+
use vector_lib::sensitive_string::SensitiveString;
12+
13+
/// Scope for Azure Data Explorer / Kusto API access.
14+
const KUSTO_SCOPE: &str = "https://kusto.kusto.windows.net/.default";
15+
16+
// ---------------------------------------------------------------------------
17+
// Internal trait: allows swapping in a mock for tests without needing to
18+
// construct `azure_core::credentials::AccessToken` (which requires the `time`
19+
// crate's `OffsetDateTime`).
20+
// ---------------------------------------------------------------------------
21+
22+
#[async_trait::async_trait]
23+
trait TokenProvider: Send + Sync {
24+
async fn get_bearer_token(&self) -> crate::Result<String>;
25+
}
26+
27+
/// Production token provider backed by [`ClientSecretCredential`].
28+
struct EntraTokenProvider {
29+
credential: Arc<ClientSecretCredential>,
30+
}
31+
32+
#[async_trait::async_trait]
33+
impl TokenProvider for EntraTokenProvider {
34+
async fn get_bearer_token(&self) -> crate::Result<String> {
35+
let access_token = self
36+
.credential
37+
.get_token(&[KUSTO_SCOPE], None)
38+
.await
39+
.map_err(|e| format!("Failed to acquire Azure Entra token: {e}"))?;
40+
41+
Ok(access_token.token.secret().to_string())
42+
}
43+
}
44+
45+
// ---------------------------------------------------------------------------
46+
// Public auth wrapper
47+
// ---------------------------------------------------------------------------
48+
49+
/// Azure Entra ID token provider for Azure Data Explorer.
50+
///
51+
/// Wraps [`azure_identity::ClientSecretCredential`] to acquire Bearer tokens
52+
/// via the OAuth2 client-credentials flow. Token caching and refresh are
53+
/// handled internally by the Azure SDK.
54+
#[derive(Clone)]
55+
pub(super) struct AzureDataExplorerAuth {
56+
provider: Arc<dyn TokenProvider>,
57+
}
58+
59+
impl AzureDataExplorerAuth {
60+
/// Creates a new auth provider backed by [`ClientSecretCredential`].
61+
pub(super) fn new(
62+
tenant_id: &str,
63+
client_id: String,
64+
client_secret: SensitiveString,
65+
) -> crate::Result<Self> {
66+
let secret = Secret::from(client_secret.inner().to_string());
67+
let credential = ClientSecretCredential::new(tenant_id, client_id, secret, None)
68+
.map_err(|e| format!("Failed to create Azure credential: {e}"))?;
69+
70+
Ok(Self {
71+
provider: Arc::new(EntraTokenProvider { credential }),
72+
})
73+
}
74+
75+
/// Creates a mock auth provider that always returns the given token.
76+
/// For use in tests only.
77+
#[cfg(test)]
78+
pub(super) fn mock(token: impl Into<String>) -> Self {
79+
Self {
80+
provider: Arc::new(MockTokenProvider {
81+
token: token.into(),
82+
}),
83+
}
84+
}
85+
86+
/// Returns a valid Bearer access token string.
87+
pub(super) async fn get_token(&self) -> crate::Result<String> {
88+
self.provider.get_bearer_token().await
89+
}
90+
}
91+
92+
// ---------------------------------------------------------------------------
93+
// Test-only mock
94+
// ---------------------------------------------------------------------------
95+
96+
#[cfg(test)]
97+
struct MockTokenProvider {
98+
token: String,
99+
}
100+
101+
#[cfg(test)]
102+
#[async_trait::async_trait]
103+
impl TokenProvider for MockTokenProvider {
104+
async fn get_bearer_token(&self) -> crate::Result<String> {
105+
Ok(self.token.clone())
106+
}
107+
}
Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
//! Configuration for the `azure_data_explorer` sink.
2+
//!
3+
//! Uses **queued ingestion** (blob upload + queue notification), matching the
4+
//! Fluent Bit `out_azure_kusto` plugin.
5+
6+
use futures::FutureExt;
7+
use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString};
8+
use vrl::value::Kind;
9+
10+
use super::{
11+
auth::AzureDataExplorerAuth,
12+
encoder::AzureDataExplorerEncoder,
13+
request_builder::AzureDataExplorerRequestBuilder,
14+
resources::ResourceManager,
15+
service::{AzureDataExplorerService, QueuedIngestConfig},
16+
sink::AzureDataExplorerSink,
17+
};
18+
use crate::{
19+
http::HttpClient,
20+
sinks::{
21+
prelude::*,
22+
util::{BatchConfig, http::http_response_retry_logic},
23+
},
24+
};
25+
26+
/// Configuration for the `azure_data_explorer` sink.
27+
#[configurable_component(sink(
28+
"azure_data_explorer",
29+
"Deliver log events to Azure Data Explorer via queued ingestion."
30+
))]
31+
#[derive(Clone, Debug)]
32+
pub struct AzureDataExplorerConfig {
33+
/// The Kusto cluster's **ingestion** endpoint URL.
34+
///
35+
/// This is the `ingest-` prefixed URL, e.g.
36+
/// `https://ingest-mycluster.eastus.kusto.windows.net`.
37+
#[configurable(metadata(
38+
docs::examples = "https://ingest-mycluster.eastus.kusto.windows.net",
39+
))]
40+
#[configurable(validation(format = "uri"))]
41+
pub(super) ingestion_endpoint: String,
42+
43+
/// The name of the target database.
44+
#[configurable(metadata(docs::examples = "my_database"))]
45+
pub(super) database: String,
46+
47+
/// The name of the target table inside the database.
48+
#[configurable(metadata(docs::examples = "my_table"))]
49+
pub(super) table: String,
50+
51+
/// Azure Entra ID (Azure AD) tenant ID for service-principal authentication.
52+
#[configurable(metadata(docs::examples = "${AZURE_TENANT_ID}"))]
53+
pub(super) tenant_id: String,
54+
55+
/// Azure Entra ID application (client) ID.
56+
#[configurable(metadata(docs::examples = "${AZURE_CLIENT_ID}"))]
57+
pub(super) client_id: String,
58+
59+
/// Azure Entra ID application client secret.
60+
#[configurable(metadata(docs::examples = "${AZURE_CLIENT_SECRET}"))]
61+
pub(super) client_secret: SensitiveString,
62+
63+
/// Optional ingestion mapping reference name.
64+
///
65+
/// When set, the value is passed in the ingestion message's
66+
/// `jsonMappingReference` property.
67+
#[serde(default)]
68+
#[configurable(metadata(docs::examples = "my_mapping"))]
69+
pub(super) mapping_reference: Option<String>,
70+
71+
#[configurable(derived)]
72+
#[serde(default)]
73+
pub(super) batch: BatchConfig<AzureDataExplorerDefaultBatchSettings>,
74+
75+
#[configurable(derived)]
76+
#[serde(default)]
77+
pub(super) request: TowerRequestConfig,
78+
79+
#[configurable(derived)]
80+
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
81+
pub(super) encoding: Transformer,
82+
83+
/// The compression algorithm to use.
84+
///
85+
/// When enabled, the JSONL payload is gzip-compressed before blob upload
86+
/// and the blob name ends with `.multijson.gz`.
87+
#[configurable(derived)]
88+
#[serde(default = "Compression::gzip_default")]
89+
pub(super) compression: Compression,
90+
91+
#[configurable(derived)]
92+
#[serde(
93+
default,
94+
deserialize_with = "crate::serde::bool_or_struct",
95+
skip_serializing_if = "crate::serde::is_default"
96+
)]
97+
pub(super) acknowledgements: AcknowledgementsConfig,
98+
}
99+
100+
#[derive(Clone, Copy, Debug, Default)]
101+
pub(super) struct AzureDataExplorerDefaultBatchSettings;
102+
103+
impl SinkBatchSettings for AzureDataExplorerDefaultBatchSettings {
104+
const MAX_EVENTS: Option<usize> = Some(1_000);
105+
const MAX_BYTES: Option<usize> = Some(4_000_000); // 4 MB
106+
const TIMEOUT_SECS: f64 = 30.0;
107+
}
108+
109+
impl GenerateConfig for AzureDataExplorerConfig {
110+
fn generate_config() -> toml::Value {
111+
toml::from_str(
112+
r#"ingestion_endpoint = "https://ingest-mycluster.eastus.kusto.windows.net"
113+
database = "my_database"
114+
table = "my_table"
115+
tenant_id = "${AZURE_TENANT_ID}"
116+
client_id = "${AZURE_CLIENT_ID}"
117+
client_secret = "${AZURE_CLIENT_SECRET}""#,
118+
)
119+
.unwrap()
120+
}
121+
}
122+
123+
#[async_trait::async_trait]
124+
#[typetag::serde(name = "azure_data_explorer")]
125+
impl SinkConfig for AzureDataExplorerConfig {
126+
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
127+
let batch_settings = self.batch.validate()?.into_batcher_settings()?;
128+
129+
let request_builder = AzureDataExplorerRequestBuilder {
130+
encoder: AzureDataExplorerEncoder {
131+
transformer: self.encoding.clone(),
132+
},
133+
compression: self.compression,
134+
};
135+
136+
let client = HttpClient::new(None, cx.proxy())?;
137+
138+
let auth = AzureDataExplorerAuth::new(
139+
&self.tenant_id,
140+
self.client_id.clone(),
141+
self.client_secret.clone(),
142+
)?;
143+
144+
// Resource manager handles .get ingestion resources + identity token caching
145+
let resource_manager = ResourceManager::new(
146+
auth.clone(),
147+
client.clone(),
148+
self.ingestion_endpoint.clone(),
149+
);
150+
151+
let queued_config = QueuedIngestConfig {
152+
database: self.database.clone(),
153+
table: self.table.clone(),
154+
mapping_reference: self.mapping_reference.clone(),
155+
compression: self.compression,
156+
};
157+
158+
let service =
159+
AzureDataExplorerService::new(client.clone(), resource_manager.clone(), queued_config);
160+
161+
let request_limits = self.request.into_settings();
162+
163+
let service = ServiceBuilder::new()
164+
.settings(request_limits, http_response_retry_logic())
165+
.service(service);
166+
167+
let sink = AzureDataExplorerSink::new(service, batch_settings, request_builder);
168+
169+
let healthcheck = healthcheck(self.ingestion_endpoint.clone(), auth).boxed();
170+
171+
Ok((VectorSink::from_event_streamsink(sink), healthcheck))
172+
}
173+
174+
fn input(&self) -> Input {
175+
let requirement = Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
176+
Input::log().with_schema_requirement(requirement)
177+
}
178+
179+
fn acknowledgements(&self) -> &AcknowledgementsConfig {
180+
&self.acknowledgements
181+
}
182+
}
183+
184+
/// Validates credentials and ingestion endpoint reachability by:
185+
/// 1. Acquiring an Entra token (validates service-principal credentials)
186+
/// 2. Executing a lightweight `.show version` management command
187+
async fn healthcheck(
188+
ingestion_endpoint: String,
189+
auth: AzureDataExplorerAuth,
190+
) -> crate::Result<()> {
191+
let token = auth.get_token().await?;
192+
193+
let mgmt_uri = format!(
194+
"{}/v1/rest/mgmt",
195+
ingestion_endpoint.trim_end_matches('/')
196+
);
197+
198+
let body = serde_json::json!({
199+
"csl": ".show version",
200+
"db": "NetDefaultDB"
201+
});
202+
let body_bytes = bytes::Bytes::from(serde_json::to_vec(&body)?);
203+
204+
let request = http::Request::post(&mgmt_uri)
205+
.header("Authorization", format!("Bearer {}", token))
206+
.header("Content-Type", "application/json")
207+
.body(hyper::Body::from(body_bytes))?;
208+
209+
let client = HttpClient::new(None, &Default::default())?;
210+
let response = client.send(request).await?;
211+
let status = response.status();
212+
213+
if status.is_success() {
214+
Ok(())
215+
} else if status == http::StatusCode::UNAUTHORIZED || status == http::StatusCode::FORBIDDEN {
216+
Err(format!(
217+
"Azure Data Explorer authentication failed (HTTP {}). \
218+
Verify tenant_id, client_id, and client_secret.",
219+
status
220+
)
221+
.into())
222+
} else {
223+
let body = http_body::Body::collect(response.into_body())
224+
.await?
225+
.to_bytes();
226+
let body_str = String::from_utf8_lossy(&body);
227+
Err(format!(
228+
"Azure Data Explorer healthcheck failed: HTTP {} - {}",
229+
status, body_str
230+
)
231+
.into())
232+
}
233+
}

0 commit comments

Comments
 (0)