Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions infra/helm/bud/templates/budgateway.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ data:
[object_storage]
type = "disabled"

[gateway.otlp_proxy]
enabled = true

[gateway.export.otlp.traces]
enabled = true

Expand Down
1 change: 1 addition & 0 deletions infra/helm/bud/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ microservices:
image: budstudio/budgateway:0.6.0
env:
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "http://{{ $.Release.Name }}-otel-collector:4317"
OTEL_EXPORTER_OTLP_HTTP_ENDPOINT: "http://{{ $.Release.Name }}-otel-collector:4318"
nodeSelector: {}
affinity: {}
# Autoscaling configuration for budgateway
Expand Down
3 changes: 3 additions & 0 deletions services/budgateway/config/tensorzero.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ enabled = false
[gateway.observability]
enabled = false

[gateway.otlp_proxy]
enabled = true

[gateway.analytics]
enabled = true
# GeoIP database for location data enrichment
Expand Down
44 changes: 42 additions & 2 deletions services/budgateway/gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tensorzero_internal::analytics_middleware::{
analytics_middleware, attach_analytics_batcher_middleware, attach_clickhouse_middleware,
attach_geoip_middleware, attach_ua_parser_middleware,
};
use tensorzero_internal::auth::require_api_key;
use tensorzero_internal::auth::{require_api_key, require_api_key_telemetry};
use tensorzero_internal::blocking_middleware::{attach_blocking_manager, blocking_middleware};
use tensorzero_internal::clickhouse::ClickHouseConnectionInfo;
use tensorzero_internal::config_parser::Config;
Expand Down Expand Up @@ -370,6 +370,35 @@ async fn main() {
// This extracts incoming traceparent headers for distributed tracing
let openai_routes = openai_routes.apply_otel_http_trace_layer();

// OTLP telemetry proxy routes (conditionally enabled)
let otlp_routes = if app_state.config.gateway.otlp_proxy.enabled {
tracing::info!("OTLP proxy routes enabled");
let routes = Router::new()
.route(
"/v1/traces",
post(endpoints::otlp_proxy::otlp_proxy_handler),
)
.route(
"/v1/metrics",
post(endpoints::otlp_proxy::otlp_proxy_handler),
)
.route(
"/v1/logs",
post(endpoints::otlp_proxy::otlp_proxy_handler),
)
.layer(DefaultBodyLimit::max(5 * 1024 * 1024)); // 5 MB limit

let routes = match &app_state.authentication_info {
AuthenticationInfo::Enabled(auth) => routes.layer(
axum::middleware::from_fn_with_state(auth.clone(), require_api_key_telemetry),
),
AuthenticationInfo::Disabled => routes,
};
Some(routes)
} else {
None
};

// Routes that don't require authentication
let public_routes = Router::new()
.route("/inference", post(endpoints::inference::inference_handler))
Expand Down Expand Up @@ -435,7 +464,10 @@ async fn main() {
);

let mut router = Router::new()
.merge(openai_routes)
.merge(openai_routes);
// NOTE: OTLP routes are merged AFTER analytics/blocking middleware layers (below)
// so they bypass analytics_middleware, blocking_middleware, TraceLayer, and the 100MB body limit.
router = router
.merge(public_routes)
.fallback(endpoints::fallback::handle_404)
.layer(axum::middleware::from_fn(add_version_header))
Expand Down Expand Up @@ -504,6 +536,14 @@ async fn main() {
));
}

// Merge OTLP proxy routes AFTER analytics/blocking middleware layers.
// In Axum, .layer() only wraps routes already in the router, so OTLP routes
// bypass: analytics_middleware (no gateway_analytics spans), blocking_middleware,
// TraceLayer (no recursive tracing), and the 100MB DefaultBodyLimit (OTLP has its own 5MB limit).
if let Some(otlp) = otlp_routes {
router = router.merge(otlp);
}

let router = router.with_state(app_state);

// Bind to the socket address specified in the config, or default to 0.0.0.0:3000
Expand Down
36 changes: 36 additions & 0 deletions services/budgateway/tensorzero-internal/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,3 +433,39 @@ pub async fn require_api_key(

Ok(next.run(request).await)
}

/// Lightweight auth middleware for OTLP telemetry proxy.
/// Validates Bearer token but does NOT consume the request body.
pub async fn require_api_key_telemetry(
State(auth): State<Auth>,
request: Request,
next: Next,
) -> Result<Response, Response> {
let key = request
.headers()
.get("authorization")
.and_then(|v| v.to_str().ok())
.map(|s| {
let s = s.trim();
s.strip_prefix("Bearer ").unwrap_or(s).to_string()
});

let key = match key {
Some(k) => k,
None => {
return Err(auth_error_response(
StatusCode::UNAUTHORIZED,
"Missing authorization header",
))
}
};

if auth.validate_api_key(&key).is_err() {
return Err(auth_error_response(
StatusCode::UNAUTHORIZED,
"Invalid API key",
));
}

Ok(next.run(request).await)
}
Comment on lines +439 to +471
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for extracting and validating the API key can be made more idiomatic and efficient. The current implementation involves an extra String allocation and a slightly verbose match statement.

Refactoring this to use ok_or_else and process the &str slice directly will make the code cleaner and avoid the unnecessary allocation, improving performance slightly.

pub async fn require_api_key_telemetry(
    State(auth): State<Auth>,
    request: Request,
    next: Next,
) -> Result<Response, Response> {
    let key = request
        .headers()
        .get("authorization")
        .and_then(|v| v.to_str().ok())
        .map(|s| s.strip_prefix("Bearer ").unwrap_or(s).trim())
        .ok_or_else(|| {
            auth_error_response(
                StatusCode::UNAUTHORIZED,
                "Missing or invalid authorization header",
            )
        })?;

    if auth.validate_api_key(key).is_err() {
        return Err(auth_error_response(
            StatusCode::UNAUTHORIZED,
            "Invalid API key",
        ));
    }

    Ok(next.run(request).await)
}

19 changes: 19 additions & 0 deletions services/budgateway/tensorzero-internal/src/config_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ pub struct GatewayConfig {
pub analytics: AnalyticsConfig,
#[serde(default)]
pub blocking: BlockingConfig,
#[serde(default)]
pub otlp_proxy: OtlpProxyConfig,
}

#[derive(Clone, Debug, Default, Deserialize, PartialEq)]
Expand All @@ -93,6 +95,23 @@ pub struct BlockingConfig {
pub enabled: bool,
}

#[derive(Debug, Clone, Deserialize)]
#[serde(default)]
pub struct OtlpProxyConfig {
pub enabled: bool,
pub collector_endpoint: String,
}

impl Default for OtlpProxyConfig {
fn default() -> Self {
Self {
enabled: false,
collector_endpoint: std::env::var("OTEL_EXPORTER_OTLP_HTTP_ENDPOINT")
.unwrap_or_else(|_| "http://localhost:4318".to_string()),
}
}
}

#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct GCPProviderTypeConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod model_resolution;
pub mod object_storage;
pub mod observability;
pub mod openai_compatible;
pub mod otlp_proxy;
pub mod status;

pub fn validate_tags(tags: &HashMap<String, String>, internal: bool) -> Result<(), Error> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use axum::body::Body;
use axum::extract::State;
use axum::http::{HeaderMap, StatusCode, Uri};
use axum::response::{IntoResponse, Response};
use std::time::Duration;

use crate::gateway_util::AppStateData;

const OTLP_PROXY_TIMEOUT: Duration = Duration::from_secs(10);

/// POST /v1/traces, /v1/metrics, /v1/logs
/// Transparent proxy to the internal OTEL collector.
/// Auth is handled by the require_api_key_telemetry middleware.
pub async fn otlp_proxy_handler(
State(app_state): State<AppStateData>,
uri: Uri,
headers: HeaderMap,
body: Body,
) -> Result<Response, Response> {
let collector_endpoint = &app_state.config.gateway.otlp_proxy.collector_endpoint;
let url = format!("{}{}", collector_endpoint, uri.path());

let mut req = app_state
.http_client
.post(&url)
.timeout(OTLP_PROXY_TIMEOUT)
.body(reqwest::Body::wrap_stream(body.into_data_stream()));

for (name, value) in headers.iter() {
if name != "host" && name != "connection" && name != "authorization" {
req = req.header(name, value);
}
}

match req.send().await {
Ok(resp) => {
let status = StatusCode::from_u16(resp.status().as_u16())
.unwrap_or(StatusCode::BAD_GATEWAY);
let resp_headers = resp.headers().clone();
let resp_body = resp.bytes().await.unwrap_or_default();

let mut response = (status, resp_body).into_response();
for (name, value) in resp_headers.iter() {
response.headers_mut().insert(name, value.clone());
}
Ok(response)
}
Comment on lines +36 to +47
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation buffers the entire upstream response body in memory using resp.bytes().await before sending it to the client. For a proxy handling potentially large payloads (even with a 5MB limit), it's more memory-efficient and robust to stream the response body directly.

Streaming avoids holding the entire response in memory and handles network interruptions more gracefully. Additionally, the status code handling can be simplified by using resp.status() directly.

        Ok(resp) => {
            let mut response_builder = Response::builder().status(resp.status());

            // Copy headers from the upstream response.
            if let Some(headers) = response_builder.headers_mut() {
                headers.extend(resp.headers().clone());
            }

            // Stream the body from the upstream response.
            let body = Body::from_stream(resp.bytes_stream());

            // It's safe to unwrap here as we've built a valid response.
            Ok(response_builder.body(body).unwrap())
        }

Err(e) => {
tracing::warn!(error = %e, url = %url, "OTLP proxy: failed to reach OTEL collector");
Err((StatusCode::BAD_GATEWAY, "OTEL collector unavailable").into_response())
}
}
}
4 changes: 4 additions & 0 deletions services/budgateway/tensorzero-internal/src/gateway_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,7 @@ mod tests {
geoip_db_path: None,
},
blocking: BlockingConfig { enabled: false },
otlp_proxy: Default::default(),
};

let config = Box::leak(Box::new(Config {
Expand Down Expand Up @@ -747,6 +748,7 @@ mod tests {
geoip_db_path: None,
},
blocking: BlockingConfig { enabled: false },
otlp_proxy: Default::default(),
};

let config = Box::leak(Box::new(Config {
Expand Down Expand Up @@ -777,6 +779,7 @@ mod tests {
geoip_db_path: None,
},
blocking: BlockingConfig { enabled: false },
otlp_proxy: Default::default(),
};
let config = Box::leak(Box::new(Config {
gateway: gateway_config,
Expand Down Expand Up @@ -975,6 +978,7 @@ mod tests {
geoip_db_path: None,
},
blocking: BlockingConfig { enabled: false },
otlp_proxy: Default::default(),
};
let config = Config {
gateway: gateway_config,
Expand Down
Loading