Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions opentelemetry-exporter-geneva/geneva-uploader-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,21 @@ pub unsafe extern "C" fn geneva_client_new(

// Auth method conversion
let auth_method = match config.auth_method {
// 0 => Managed Identity (default to system-assigned when coming from FFI for now)
0 => AuthMethod::SystemManagedIdentity,
0 => {
// Unified: Workload Identity (AKS) or System Managed Identity (VM)
// Auto-detect based on environment
if std::env::var("AZURE_FEDERATED_TOKEN_FILE").is_ok() {
// Workload Identity: azure_identity crate reads AZURE_CLIENT_ID, AZURE_TENANT_ID,
// and AZURE_FEDERATED_TOKEN_FILE automatically from environment
let resource = std::env::var("GENEVA_WORKLOAD_IDENTITY_RESOURCE")
.unwrap_or_else(|_| "https://monitor.azure.com".to_string());

AuthMethod::WorkloadIdentity { resource }
} else {
AuthMethod::SystemManagedIdentity
}
}

1 => {
// Certificate authentication: read fields from tagged union
let cert = unsafe { config.auth.cert };
Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = ["raw_value"] }
uuid = { version = "1.0", features = ["v4"] }
# TODO - support both native-tls and rustls
reqwest = { version = "0.12", features = ["native-tls", "native-tls-alpn"], default-features = false}
native-tls = "0.2"
# http2 feature is required by hyper-util even when using http1_only()
reqwest = { version = "0.12", features = ["native-tls", "native-tls-alpn", "http2"], default-features = false}
native-tls = "0.2"
thiserror = "2.0"
chrono = "0.4"
url = "2.2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl GenevaClient {
}
}
AuthMethod::Certificate { .. } => {}
AuthMethod::WorkloadIdentity { .. } => {}
#[cfg(feature = "mock_auth")]
AuthMethod::MockAuth => {}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Geneva Config Client with TLS (PKCS#12) and TODO: Managed Identity support
// Geneva Config Client with TLS (PKCS#12) and Azure Workload Identity support

use base64::{engine::general_purpose, Engine as _};
use reqwest::{
Expand All @@ -18,15 +18,19 @@ use std::fs;
use std::path::PathBuf;
use std::sync::RwLock;

// Azure Identity imports for MSI authentication
// Azure Identity imports for MSI and Workload Identity authentication
use azure_core::credentials::TokenCredential;
use azure_identity::{ManagedIdentityCredential, ManagedIdentityCredentialOptions, UserAssignedId};
use azure_identity::{
ManagedIdentityCredential, ManagedIdentityCredentialOptions, UserAssignedId,
WorkloadIdentityCredential,
};

/// Authentication methods for the Geneva Config Client.
///
/// The client supports two authentication methods:
/// The client supports three authentication methods:
/// - Certificate-based authentication using PKCS#12 (.p12) files
/// - Managed Identity (Azure) - planned for future implementation
/// - Azure Workload Identity (Federated Identity) for Kubernetes workloads
/// - Mock authentication for testing
///
/// # Certificate Format
/// Certificates should be in PKCS#12 (.p12) format for client TLS authentication.
Expand Down Expand Up @@ -65,6 +69,19 @@ pub enum AuthMethod {
UserManagedIdentityByObjectId { object_id: String },
/// User-assigned managed identity by resource ID
UserManagedIdentityByResourceId { resource_id: String },
/// Azure Workload Identity authentication (Federated Identity for Kubernetes)
///
/// The following environment variables must be set in the pod spec:
/// * `AZURE_CLIENT_ID` - Azure AD Application (client) ID (set explicitly in pod env)
/// * `AZURE_TENANT_ID` - Azure AD Tenant ID (set explicitly in pod env)
/// * `AZURE_FEDERATED_TOKEN_FILE` - Path to service account token file (auto-injected by workload identity webhook)
///
/// These variables are automatically read by the Azure Identity SDK at runtime.
///
/// # Arguments
/// * `resource` - Azure AD resource URI for token acquisition
/// (e.g., <https://monitor.azure.com> for Azure Public Cloud)
WorkloadIdentity { resource: String },
#[cfg(feature = "mock_auth")]
MockAuth, // No authentication, used for testing purposes
}
Expand All @@ -78,6 +95,8 @@ pub(crate) enum GenevaConfigClientError {
JwtTokenError(String),
#[error("Certificate error: {0}")]
Certificate(String),
#[error("Workload Identity authentication error: {0}")]
WorkloadIdentityAuth(String),
#[error("MSI authentication error: {0}")]
MsiAuth(String),

Expand Down Expand Up @@ -257,6 +276,10 @@ impl GenevaConfigClient {
.map_err(|e| GenevaConfigClientError::Certificate(e.to_string()))?;
client_builder = client_builder.use_preconfigured_tls(tls_connector);
}
AuthMethod::WorkloadIdentity { .. } => {
// No special HTTP client configuration needed for Workload Identity
// Authentication is done via Bearer token in request headers
}
AuthMethod::SystemManagedIdentity
| AuthMethod::UserManagedIdentity { .. }
| AuthMethod::UserManagedIdentityByObjectId { .. }
Expand All @@ -276,13 +299,14 @@ impl GenevaConfigClient {
let version_str = format!("Ver{0}v0", config.config_major_version);

// Use different API endpoints based on authentication method
// Certificate auth uses "api", MSI auth uses "userapi"
// Certificate auth uses "api", MSI auth and Workload Identity use "userapi"
let api_path = match &config.auth_method {
AuthMethod::Certificate { .. } => "api",
AuthMethod::SystemManagedIdentity
| AuthMethod::UserManagedIdentity { .. }
| AuthMethod::UserManagedIdentityByObjectId { .. }
| AuthMethod::UserManagedIdentityByResourceId { .. } => "userapi",
| AuthMethod::UserManagedIdentityByResourceId { .. }
| AuthMethod::WorkloadIdentity { .. } => "userapi",
#[cfg(feature = "mock_auth")]
AuthMethod::MockAuth => "api", // treat mock like certificate path for URL shape
};
Expand Down Expand Up @@ -329,6 +353,53 @@ impl GenevaConfigClient {
headers
}

/// Get Azure AD token using Workload Identity (Federated Identity)
///
/// Reads AZURE_CLIENT_ID, AZURE_TENANT_ID, and AZURE_FEDERATED_TOKEN_FILE from environment variables.
/// In Kubernetes:
/// - AZURE_CLIENT_ID and AZURE_TENANT_ID must be set explicitly in the pod spec
/// - AZURE_FEDERATED_TOKEN_FILE is auto-injected by the workload identity webhook
async fn get_workload_identity_token(&self) -> Result<String> {
let resource =
match &self.config.auth_method {
AuthMethod::WorkloadIdentity { resource } => resource,
_ => return Err(GenevaConfigClientError::WorkloadIdentityAuth(
"get_workload_identity_token called but auth method is not WorkloadIdentity"
.to_string(),
)),
};

// TODO: Extract scope generation logic into helper function shared with get_msi_token()
let base = resource.trim_end_matches("/.default").trim_end_matches('/');
let mut scope_candidates: Vec<String> = vec![format!("{base}/.default"), base.to_string()];
if !base.ends_with('/') {
scope_candidates.push(format!("{base}/"));
}

// TODO: Consider caching WorkloadIdentityCredential if profiling shows credential creation overhead
// Pass None to let azure_identity crate read AZURE_CLIENT_ID, AZURE_TENANT_ID,
// and AZURE_FEDERATED_TOKEN_FILE from environment variables automatically
let credential = WorkloadIdentityCredential::new(None).map_err(|e| {
GenevaConfigClientError::WorkloadIdentityAuth(format!(
"Failed to create WorkloadIdentityCredential. Ensure AZURE_CLIENT_ID, AZURE_TENANT_ID, and AZURE_FEDERATED_TOKEN_FILE environment variables are set: {e}"
))
})?;

let mut last_err: Option<String> = None;
for scope in &scope_candidates {
match credential.get_token(&[scope.as_str()], None).await {
Ok(token) => return Ok(token.token.secret().to_string()),
Err(e) => last_err = Some(e.to_string()),
}
}

let detail = last_err.unwrap_or_else(|| "no error detail".into());
Err(GenevaConfigClientError::WorkloadIdentityAuth(format!(
"Workload Identity token acquisition failed. Scopes tried: {scopes}. Last error: {detail}",
scopes = scope_candidates.join(", ")
)))
}

/// Get MSI token for GCS authentication
async fn get_msi_token(&self) -> Result<String> {
let resource = self.config.msi_resource.as_ref().ok_or_else(|| {
Expand All @@ -337,17 +408,13 @@ impl GenevaConfigClient {
)
})?;

// Normalize resource (strip trailing "/.default" if provided by user)
// TODO: Extract scope generation logic into helper function shared with get_workload_identity_token()
let base = resource.trim_end_matches("/.default").trim_end_matches('/');

// Candidate scopes tried with Azure Identity
let mut scope_candidates: Vec<String> = vec![format!("{base}/.default"), base.to_string()];
// Add variant with trailing slash if not already present
if !base.ends_with('/') {
scope_candidates.push(format!("{base}/"));
}

// Build credential based on selector
let user_assigned_id = match &self.config.auth_method {
AuthMethod::SystemManagedIdentity => None,
AuthMethod::UserManagedIdentity { client_id } => {
Expand All @@ -367,6 +434,7 @@ impl GenevaConfigClient {
}
};

// TODO: Consider caching ManagedIdentityCredential if profiling shows credential creation overhead
let options = ManagedIdentityCredentialOptions {
user_assigned_id,
..Default::default()
Expand All @@ -382,6 +450,7 @@ impl GenevaConfigClient {
Err(e) => last_err = Some(e.to_string()),
}
}

let detail = last_err.unwrap_or_else(|| "no error detail".into());
Err(GenevaConfigClientError::MsiAuth(format!(
"Managed Identity token acquisition failed. Scopes tried: {scopes}. Last error: {detail}. IMDS fallback intentionally disabled.",
Expand Down Expand Up @@ -506,8 +575,8 @@ impl GenevaConfigClient {

/// Internal method that actually fetches data from Geneva Config Service
async fn fetch_ingestion_info(&self) -> Result<(IngestionGatewayInfo, MonikerInfo)> {
let tag_id = Uuid::new_v4().to_string(); //TODO - uuid is costly, check if counter is enough?
let mut url = String::with_capacity(self.precomputed_url_prefix.len() + 50); // Pre-allocate with reasonable capacity
let tag_id = Uuid::new_v4().to_string(); // TODO: consider cheaper counter if perf-critical
let mut url = String::with_capacity(self.precomputed_url_prefix.len() + 50);
write!(&mut url, "{}&TagId={tag_id}", self.precomputed_url_prefix).map_err(|e| {
GenevaConfigClientError::InternalError(format!("Failed to write URL: {e}"))
})?;
Expand All @@ -518,48 +587,44 @@ impl GenevaConfigClient {

request = request.header("x-ms-client-request-id", req_id);

// Add MSI authentication for managed identity auth method
// Add appropriate authentication header
match &self.config.auth_method {
AuthMethod::WorkloadIdentity { .. } => {
let token = self.get_workload_identity_token().await?;
request = request.header(AUTHORIZATION, format!("Bearer {}", token));
}
AuthMethod::SystemManagedIdentity
| AuthMethod::UserManagedIdentity { .. }
| AuthMethod::UserManagedIdentityByObjectId { .. }
| AuthMethod::UserManagedIdentityByResourceId { .. } => {
let msi_token = self.get_msi_token().await?;
request = request.header(AUTHORIZATION, format!("Bearer {}", msi_token));
let token = self.get_msi_token().await?;
request = request.header(AUTHORIZATION, format!("Bearer {}", token));
}
AuthMethod::Certificate { .. } => { /* mTLS only */ }
#[cfg(feature = "mock_auth")]
AuthMethod::MockAuth => { /* no auth header */ }
}

// Log the request details for debugging
// Send HTTP request
let response = match request.send().await {
Ok(response) => response,
Err(e) => {
return Err(GenevaConfigClientError::Http(e));
}
Ok(resp) => resp,
Err(e) => return Err(GenevaConfigClientError::Http(e)),
};

// Check if the response is successful
let status = response.status();
let body = response.text().await?;

if status.is_success() {
let parsed = match serde_json::from_str::<GenevaResponse>(&body) {
Ok(response) => response,
Err(e) => {
return Err(GenevaConfigClientError::AuthInfoNotFound(format!(
"Failed to parse response: {e}"
)));
}
};
let parsed = serde_json::from_str::<GenevaResponse>(&body).map_err(|e| {
GenevaConfigClientError::AuthInfoNotFound(format!("Failed to parse response: {e}"))
})?;

for account in parsed.storage_account_keys {
if account.is_primary_moniker && account.account_moniker_name.contains("diag") {
let moniker_info = MonikerInfo {
name: account.account_moniker_name,
account_group: account.account_group_name,
};

return Ok((parsed.ingestion_gateway_info, moniker_info));
}
}
Expand Down Expand Up @@ -610,16 +675,21 @@ fn extract_endpoint_from_token(token: &str) -> Result<String> {
_ => payload.to_string(),
};

// Decode the Base64-encoded payload into raw bytes with a more tolerant approach.
// Decode the Base64-encoded payload into raw bytes.
// Try URL-safe (with and without padding), then fall back to standard Base64.
let decoded = match general_purpose::URL_SAFE_NO_PAD.decode(&payload) {
Ok(b) => b,
Err(e_url) => match general_purpose::STANDARD.decode(&payload) {
Err(e_url_no_pad) => match general_purpose::URL_SAFE.decode(&payload) {
Ok(b) => b,
Err(e_std) => {
return Err(GenevaConfigClientError::JwtTokenError(format!(
"Failed to decode JWT (url_safe and standard): url_err={e_url}; std_err={e_std}"
)))
}
Err(e_url_pad) => match general_purpose::STANDARD.decode(&payload) {
Ok(b) => b,
Err(e_std) => {
return Err(GenevaConfigClientError::JwtTokenError(format!(
"Failed to decode JWT (URL_SAFE_NO_PAD, URL_SAFE, and STANDARD): \
no_pad_err={e_url_no_pad}; pad_err={e_url_pad}; std_err={e_std}"
)))
}
},
},
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@ mod tests {
namespace: "ns".to_string(),
region: "region".to_string(),
config_major_version: 1,
auth_method: AuthMethod::SystemManagedIdentity,
auth_method: AuthMethod::WorkloadIdentity {
resource: "https://monitor.azure.com".to_string(),
},
msi_resource: None,
};

assert_eq!(config.environment, "env");
assert_eq!(config.account, "acct");
assert!(matches!(
config.auth_method,
AuthMethod::SystemManagedIdentity
));

match config.auth_method {
AuthMethod::WorkloadIdentity { .. } => {}
_ => panic!("expected WorkloadIdentity variant"),
}
}

fn generate_self_signed_p12() -> (NamedTempFile, String) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Dockerfile for Geneva Uploader Workload Identity Test
#
# This Dockerfile must be built from the repository root to access the workspace:
# cd /path/to/opentelemetry-rust-contrib
# docker build -f opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/examples/Dockerfile -t geneva-uploader-test:latest .
#
# Or using ACR:
# az acr build --registry <registry-name> --image geneva-uploader-test:latest \
# --file opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/examples/Dockerfile .

FROM rust:1.85-slim AS builder

# Install build dependencies
RUN apt-get update && apt-get install -y \
pkg-config \
libssl-dev \
&& rm -rf /var/lib/apt/lists/*

WORKDIR /app

# Copy the entire workspace from repository root
COPY . .

# Build the example
WORKDIR /app/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva
RUN cargo build --release --example basic_workload_identity_test

# Runtime stage
FROM debian:bookworm-slim

# Install runtime dependencies
RUN apt-get update && apt-get install -y \
ca-certificates \
libssl3 \
&& rm -rf /var/lib/apt/lists/*

# Copy the binary
COPY --from=builder /app/target/release/examples/basic_workload_identity_test /usr/local/bin/geneva-uploader-test

# Run as non-root user
RUN useradd -m -u 1000 appuser
USER appuser

ENTRYPOINT ["/usr/local/bin/geneva-uploader-test"]

Loading
Loading