diff --git a/lib/bindings/python/Cargo.lock b/lib/bindings/python/Cargo.lock index c542157f066..cbe1ab6ba18 100644 --- a/lib/bindings/python/Cargo.lock +++ b/lib/bindings/python/Cargo.lock @@ -1212,6 +1212,7 @@ dependencies = [ "rand 0.9.1", "rayon", "regex", + "reqwest", "rmp-serde", "serde", "serde_json", diff --git a/lib/llm/Cargo.toml b/lib/llm/Cargo.toml index 062faf46f2d..42ed0750285 100644 --- a/lib/llm/Cargo.toml +++ b/lib/llm/Cargo.toml @@ -62,6 +62,7 @@ humantime = { workspace = true } # input/batch rand = { workspace = true } oneshot = { workspace = true } prometheus = { workspace = true } +reqwest = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } strum = { workspace = true } diff --git a/lib/llm/src/http/service/metrics.rs b/lib/llm/src/http/service/metrics.rs index 181763895cc..2cdc8d8aed5 100644 --- a/lib/llm/src/http/service/metrics.rs +++ b/lib/llm/src/http/service/metrics.rs @@ -3,10 +3,13 @@ use axum::{extract::State, http::StatusCode, response::IntoResponse, routing::get, Router}; use prometheus::{Encoder, HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts}; +use reqwest::Client; +use serde::Serialize; use std::{ sync::Arc, - time::{Duration, Instant}, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; +use tokio::spawn; pub use prometheus::Registry; @@ -24,6 +27,107 @@ pub const REQUEST_TYPE_STREAM: &str = "stream"; /// Partial value for the `type` label in the request counter for unary requests pub const REQUEST_TYPE_UNARY: &str = "unary"; +/// Request data for external observability integration +#[derive(Serialize)] +pub struct HeliconeProviderRequest { + pub url: String, + pub json: serde_json::Value, + pub meta: serde_json::Value, +} + +/// Response data for external observability integration +#[derive(Serialize)] +pub struct HeliconeProviderResponse { + pub json: serde_json::Value, + pub status: u16, + pub headers: serde_json::Value, +} + +/// Timing information for request/response cycle +#[derive(Serialize)] +pub struct HeliconeTiming { + #[serde(rename = "startTime")] + pub start_time: String, + #[serde(rename = "endTime")] + pub end_time: String, + #[serde(rename = "timeToFirstToken")] + pub time_to_first_token: Option, +} + +/// Complete trace data for external observability integration +#[derive(Serialize)] +pub struct HeliconeTrace { + #[serde(rename = "providerRequest")] + pub provider_request: HeliconeProviderRequest, + #[serde(rename = "providerResponse", skip_serializing_if = "Option::is_none")] + pub provider_response: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub timing: Option, + pub provider: String, +} + +/// Logger for external observability integration +#[derive(Clone)] +pub struct HeliconeLogger { + client: Client, + endpoint: String, + api_key: Option, + enabled: bool, +} + +impl HeliconeLogger { + pub fn new(endpoint: Option) -> Self { + let endpoint = endpoint.unwrap_or_default(); + let api_key = std::env::var("HELICONE_API_KEY").ok(); + let enabled = !endpoint.is_empty(); + + if enabled { + tracing::debug!("Helicone logger initialized: endpoint={}, has_api_key={}", endpoint, api_key.is_some()); + } + + Self { + client: Client::new(), + endpoint, + api_key, + enabled, + } + } + + pub fn log_async(&self, trace: HeliconeTrace) { + if !self.enabled { + return; + } + + let client = self.client.clone(); + let endpoint = self.endpoint.clone(); + let api_key = self.api_key.clone(); + + spawn(async move { + let mut request = client + .post(&endpoint) + .header("Content-Type", "application/json"); + + // Add Authorization header if API key is available + if let Some(key) = api_key { + request = request.header("Authorization", format!("Bearer {}", key)); + } + + match request.json(&trace).send().await { + Ok(response) => { + if response.status().is_success() { + tracing::debug!("Sent Helicone trace successfully"); + } else { + tracing::warn!("Helicone trace request failed with status: {}", response.status()); + } + } + Err(e) => { + tracing::error!("Failed to send Helicone trace: {}", e); + } + } + }); + } +} + pub struct Metrics { request_counter: IntCounterVec, inflight_gauge: IntGaugeVec, @@ -32,6 +136,7 @@ pub struct Metrics { output_sequence_length: HistogramVec, time_to_first_token: HistogramVec, inter_token_latency: HistogramVec, + helicone_logger: HeliconeLogger, } /// RAII object for inflight gauge and request counters @@ -82,7 +187,9 @@ pub enum Status { pub struct ResponseMetricCollector { metrics: Arc, model: String, + request_id: String, start_time: Instant, + start_timestamp: u64, // we use is_first_token to distinguish TTFT from ITL. It is true by default and // flipped to false when the first token is returned and TTFT is published. is_first_token: bool, @@ -90,16 +197,25 @@ pub struct ResponseMetricCollector { // be computed. last_response_time: Option, osl: usize, + // Store the original request data for Helicone logging + original_request: Option, + endpoint_url: String, + time_to_first_token: Option, + // Accumulate response chunks to build complete response + response_chunks: Vec, + // Track token counts for Helicone logging + input_tokens: usize, + total_output_tokens: usize, } impl Default for Metrics { fn default() -> Self { - Self::new("nv_llm") + Self::new("nv_llm", None) } } impl Metrics { - /// Create Metrics with the given prefix + /// Create Metrics with the given prefix and optional Helicone endpoint /// The following metrics will be created: /// - `{prefix}_http_service_requests_total` - IntCounterVec for the total number of requests processed /// - `{prefix}_http_service_inflight_requests` - IntGaugeVec for the number of inflight requests @@ -108,7 +224,7 @@ impl Metrics { /// - `{prefix}_http_service_output_sequence_tokens` - HistogramVec for output sequence length in tokens /// - `{prefix}_http_service_time_to_first_token_seconds` - HistogramVec for time to first token in seconds /// - `{prefix}_http_service_inter_token_latency_seconds` - HistogramVec for inter-token latency in seconds - pub fn new(prefix: &str) -> Self { + pub fn new(prefix: &str, helicone_endpoint: Option) -> Self { let request_counter = IntCounterVec::new( Opts::new( format!("{}_http_service_requests_total", prefix), @@ -197,6 +313,7 @@ impl Metrics { output_sequence_length, time_to_first_token, inter_token_latency, + helicone_logger: HeliconeLogger::new(helicone_endpoint), } } @@ -294,8 +411,20 @@ impl Metrics { } /// Create a new [`ResponseMetricCollector`] for collecting per-response metrics (i.e., TTFT, ITL) - pub fn create_response_collector(self: Arc, model: &str) -> ResponseMetricCollector { - ResponseMetricCollector::new(self, model.to_string().to_lowercase()) + pub fn create_response_collector( + self: Arc, + model: &str, + request_id: &str, + original_request: Option, + endpoint_url: String, + ) -> ResponseMetricCollector { + ResponseMetricCollector::new( + self, + model.to_string().to_lowercase(), + request_id.to_string(), + original_request, + endpoint_url, + ) } } @@ -392,14 +521,49 @@ impl Status { } impl ResponseMetricCollector { - fn new(metrics: Arc, model: String) -> Self { + fn new( + metrics: Arc, + model: String, + request_id: String, + original_request: Option, + endpoint_url: String, + ) -> Self { + let start_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64; + + // Send initial request trace to observability backend if configured + if let Some(ref request_data) = original_request { + let trace = HeliconeTrace { + provider_request: HeliconeProviderRequest { + url: endpoint_url.clone(), + json: request_data.clone(), + meta: serde_json::json!({ + "helicone-request-id": request_id, + "helicone-user-id": "dynamo-user" + }), + }, + provider_response: None, + timing: None, + provider: "OPENAI".to_string(), + }; + + metrics.helicone_logger.log_async(trace); + } + ResponseMetricCollector { metrics, model, + request_id, + start_timestamp, is_first_token: true, last_response_time: None, start_time: Instant::now(), osl: 0, + original_request, + endpoint_url, + time_to_first_token: None, + response_chunks: Vec::new(), + input_tokens: 0, + total_output_tokens: 0, } } @@ -414,11 +578,18 @@ impl ResponseMetricCollector { return; } + // Update token counts for observability + self.input_tokens = isl; + self.total_output_tokens += num_tokens; + if self.is_first_token { // NOTE: when there are multiple tokens in the first response, // we use the full response time as TTFT and ignore the ITL self.is_first_token = false; + // Capture TTFT on first token + self.time_to_first_token = Some(self.start_time.elapsed().as_millis() as u64); + // Publish TTFT let ttft = self.start_time.elapsed().as_secs_f64(); self.metrics @@ -449,6 +620,11 @@ impl ResponseMetricCollector { self.last_response_time = Some(current_duration); } + + /// Capture response text chunks for building the complete response + pub fn observe_response_chunk(&mut self, chunk: &str) { + self.response_chunks.push(chunk.to_string()); + } } impl Drop for ResponseMetricCollector { @@ -458,6 +634,59 @@ impl Drop for ResponseMetricCollector { .output_sequence_length .with_label_values(&[&self.model]) .observe(self.osl as f64); + + // Send complete trace to observability backend with request and response data + if let Some(ref original_request) = self.original_request { + let end_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64; + let complete_response = self.response_chunks.join(""); + + // Build a proper OpenAI-style response + let response_json = serde_json::json!({ + "id": format!("chatcmpl-{}", &self.request_id[..8]), + "object": "chat.completion", + "created": self.start_timestamp / 1000, + "model": &self.model, + "choices": [{ + "index": 0, + "message": { + "role": "assistant", + "content": complete_response + }, + "finish_reason": "stop" + }], + "usage": { + "prompt_tokens": self.input_tokens, + "completion_tokens": self.total_output_tokens, + "total_tokens": self.input_tokens + self.total_output_tokens + } + }); + + let trace = HeliconeTrace { + provider_request: HeliconeProviderRequest { + url: self.endpoint_url.clone(), + json: original_request.clone(), + meta: serde_json::json!({ + "helicone-request-id": &self.request_id, + "helicone-user-id": "dynamo-user" + }), + }, + provider_response: Some(HeliconeProviderResponse { + json: response_json, + status: 200, + headers: serde_json::json!({ + "content-type": "application/json" + }), + }), + timing: Some(HeliconeTiming { + start_time: (self.start_timestamp / 1000).to_string(), + end_time: (end_timestamp / 1000).to_string(), + time_to_first_token: self.time_to_first_token, + }), + provider: "OPENAI".to_string(), + }; + + self.metrics.helicone_logger.log_async(trace); + } } } diff --git a/lib/llm/src/http/service/openai.rs b/lib/llm/src/http/service/openai.rs index b7de93f1cb7..e5266895daa 100644 --- a/lib/llm/src/http/service/openai.rs +++ b/lib/llm/src/http/service/openai.rs @@ -233,7 +233,16 @@ async fn completions( .metrics_clone() .create_inflight_guard(model, Endpoint::Completions, streaming); - let mut response_collector = state.metrics_clone().create_response_collector(model); + // Capture request data for observability + let request_json = serde_json::to_value(&request.inner).ok(); + let endpoint_url = "https://api.openai.com/v1/completions".to_string(); + + let mut response_collector = state.metrics_clone().create_response_collector( + model, + &request_id, + request_json, + endpoint_url + ); // prepare to process any annotations let annotations = request.annotations(); @@ -293,6 +302,11 @@ async fn completions( ErrorMessage::internal_server_error("Failed to fold completions stream") })?; + // Capture response content for observability + for choice in &response.inner.choices { + response_collector.observe_response_chunk(&choice.text); + } + inflight_guard.mark_ok(); Ok(Json(response).into_response()) } @@ -456,7 +470,16 @@ async fn chat_completions( .metrics_clone() .create_inflight_guard(model, Endpoint::ChatCompletions, streaming); - let mut response_collector = state.metrics_clone().create_response_collector(model); + // Capture request data for observability + let request_json = serde_json::to_value(&request.inner).ok(); + let endpoint_url = "https://api.openai.com/v1/chat/completions".to_string(); + + let mut response_collector = state.metrics_clone().create_response_collector( + model, + &request_id, + request_json, + endpoint_url + ); tracing::trace!("Issuing generate call for chat completions"); let annotations = request.annotations(); @@ -521,6 +544,13 @@ async fn chat_completions( )) })?; + // Capture response content for observability + for choice in &response.inner.choices { + if let Some(content) = &choice.message.content { + response_collector.observe_response_chunk(content); + } + } + inflight_guard.mark_ok(); Ok(Json(response).into_response()) } @@ -685,7 +715,16 @@ async fn responses( .metrics_clone() .create_inflight_guard(model, Endpoint::Responses, false); - let _response_collector = state.metrics_clone().create_response_collector(model); + // Capture request data for observability + let request_json = serde_json::to_value(&request.inner).ok(); + let endpoint_url = "https://api.openai.com/v1/responses".to_string(); + + let _response_collector = state.metrics_clone().create_response_collector( + model, + &request_id, + request_json, + endpoint_url + ); tracing::trace!("Issuing generate call for chat completions"); @@ -902,7 +941,7 @@ impl From> for EventConverter { } } -fn process_event_converter( +fn process_event_converter( annotated: EventConverter, response_collector: &mut ResponseMetricCollector, ) -> Result { @@ -923,8 +962,25 @@ fn process_event_converter( let mut event = Event::default(); - if let Some(data) = annotated.data { - event = event.json_data(data)?; + if let Some(data) = &annotated.data { + // Extract response content for observability + if let Ok(data_value) = serde_json::to_value(data) { + if let Some(choices) = data_value.get("choices").and_then(|c| c.as_array()) { + for choice in choices { + // For completions endpoint - look for "text" field + if let Some(text) = choice.get("text").and_then(|t| t.as_str()) { + response_collector.observe_response_chunk(text); + } + // For chat completions endpoint - look for delta content + if let Some(delta) = choice.get("delta") { + if let Some(content) = delta.get("content").and_then(|c| c.as_str()) { + response_collector.observe_response_chunk(content); + } + } + } + } + } + event = event.json_data(data.clone())?; } if let Some(msg) = annotated.event { diff --git a/lib/llm/src/http/service/service_v2.rs b/lib/llm/src/http/service/service_v2.rs index 0b2af7763cc..00344e84f97 100644 --- a/lib/llm/src/http/service/service_v2.rs +++ b/lib/llm/src/http/service/service_v2.rs @@ -23,9 +23,37 @@ pub struct State { impl State { pub fn new(manager: Arc) -> Self { + // Check for Helicone endpoint and API key from environment variables + let helicone_base_url = std::env::var("HELICONE_ENDPOINT").ok(); + let helicone_api_key = std::env::var("HELICONE_API_KEY").ok(); + + // Construct the full Helicone trace endpoint + let helicone_endpoint = helicone_base_url.as_ref().map(|base_url| { + if base_url.ends_with("/") { + format!("{}v1/trace/custom/log", base_url) + } else { + format!("{}/v1/trace/custom/log", base_url) + } + }); + + // Log Helicone integration status + match (&helicone_endpoint, &helicone_api_key) { + (Some(endpoint), Some(_)) => tracing::info!( + "Helicone integration enabled: {} (with API key)", + endpoint + ), + (Some(endpoint), None) => tracing::info!( + "Helicone integration enabled: {} (no API key)", + endpoint + ), + (None, _) => tracing::debug!( + "Helicone integration disabled (no HELICONE_ENDPOINT set)" + ), + } + Self { manager, - metrics: Arc::new(Metrics::default()), + metrics: Arc::new(Metrics::new("nv_llm", helicone_endpoint)), } } @@ -112,6 +140,7 @@ impl HttpService { let address = format!("{}:{}", self.host, self.port); tracing::info!(address, "Starting HTTP service on: {address}"); + let listener = tokio::net::TcpListener::bind(address.as_str()) .await .unwrap_or_else(|_| panic!("could not bind to address: {address}"));