diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 90acb1e7726..e387841a58a 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1605,10 +1605,12 @@ dependencies = [ "opentelemetry-otlp", "opentelemetry-semantic-conventions", "opentelemetry_sdk", + "pretty_assertions", "reqwest", "serde", "serde_json", "strum_macros 0.27.2", + "thiserror 2.0.17", "tokio", "tracing", "tracing-opentelemetry", diff --git a/codex-rs/core/Cargo.toml b/codex-rs/core/Cargo.toml index 51fab19decf..8bf68d7edd6 100644 --- a/codex-rs/core/Cargo.toml +++ b/codex-rs/core/Cargo.toml @@ -122,7 +122,8 @@ keyring = { workspace = true, features = ["sync-secret-service"] } assert_cmd = { workspace = true } assert_matches = { workspace = true } codex-arg0 = { workspace = true } -codex-core = { path = ".", features = ["deterministic_process_ids"] } +codex-core = { path = ".", default-features = false, features = ["deterministic_process_ids"] } +codex-otel = { workspace = true, features = ["disable-default-metrics-exporter"] } codex-utils-cargo-bin = { workspace = true } core_test_support = { workspace = true } ctor = { workspace = true } diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 785d0475b72..0023603f2fd 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -18,7 +18,7 @@ use codex_api::common::Reasoning; use codex_api::create_text_param_for_request; use codex_api::error::ApiError; use codex_app_server_protocol::AuthMode; -use codex_otel::otel_manager::OtelManager; +use codex_otel::OtelManager; use codex_protocol::ThreadId; use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig; use codex_protocol::models::ResponseItem; diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 828a6bab017..1f65524bd9e 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -152,7 +152,7 @@ use crate::user_instructions::UserInstructions; use crate::user_notification::UserNotification; use crate::util::backoff; use codex_async_utils::OrCancelExt; -use codex_otel::otel_manager::OtelManager; +use codex_otel::OtelManager; use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseInputItem; @@ -655,6 +655,7 @@ impl Session { terminal::user_agent(), session_configuration.session_source.clone(), ); + config.features.emit_metrics(&otel_manager); otel_manager.conversation_starts( config.model_provider.name.as_str(), diff --git a/codex-rs/core/src/config/mod.rs b/codex-rs/core/src/config/mod.rs index 035794f4aa9..06d577ed5b6 100644 --- a/codex-rs/core/src/config/mod.rs +++ b/codex-rs/core/src/config/mod.rs @@ -1440,11 +1440,13 @@ impl Config { .unwrap_or(DEFAULT_OTEL_ENVIRONMENT.to_string()); let exporter = t.exporter.unwrap_or(OtelExporterKind::None); let trace_exporter = t.trace_exporter.unwrap_or_else(|| exporter.clone()); + let metrics_exporter = t.metrics_exporter.unwrap_or(OtelExporterKind::Statsig); OtelConfig { log_user_prompt, environment, exporter, trace_exporter, + metrics_exporter, } }, }; diff --git a/codex-rs/core/src/config/types.rs b/codex-rs/core/src/config/types.rs index 6c421d4608f..191b44c780a 100644 --- a/codex-rs/core/src/config/types.rs +++ b/codex-rs/core/src/config/types.rs @@ -306,6 +306,7 @@ pub struct OtelTlsConfig { #[serde(rename_all = "kebab-case")] pub enum OtelExporterKind { None, + Statsig, OtlpHttp { endpoint: String, #[serde(default)] @@ -337,6 +338,11 @@ pub struct OtelConfigToml { /// Optional trace exporter pub trace_exporter: Option, + + /// Optional metrics exporter + /// + /// Defaults to `statsig` outside of tests. + pub metrics_exporter: Option, } /// Effective OTEL settings after defaults are applied. @@ -346,6 +352,7 @@ pub struct OtelConfig { pub environment: String, pub exporter: OtelExporterKind, pub trace_exporter: OtelExporterKind, + pub metrics_exporter: OtelExporterKind, } impl Default for OtelConfig { @@ -355,6 +362,7 @@ impl Default for OtelConfig { environment: DEFAULT_OTEL_ENVIRONMENT.to_owned(), exporter: OtelExporterKind::None, trace_exporter: OtelExporterKind::None, + metrics_exporter: OtelExporterKind::Statsig, } } } diff --git a/codex-rs/core/src/features.rs b/codex-rs/core/src/features.rs index 2c1dcc95826..ffc766f0f83 100644 --- a/codex-rs/core/src/features.rs +++ b/codex-rs/core/src/features.rs @@ -7,6 +7,7 @@ use crate::config::ConfigToml; use crate::config::profile::ConfigProfile; +use codex_otel::OtelManager; use serde::Deserialize; use serde::Serialize; use std::collections::BTreeMap; @@ -199,6 +200,21 @@ impl Features { .map(|usage| (usage.alias.as_str(), usage.feature)) } + pub fn emit_metrics(&self, otel: &OtelManager) { + for feature in FEATURES { + if self.enabled(feature.id) != feature.default_enabled { + otel.counter( + "codex.feature.state", + 1, + &[ + ("feature", feature.key), + ("value", &self.enabled(feature.id).to_string()), + ], + ); + } + } + } + /// Apply a table of key -> bool toggles (e.g. from TOML). pub fn apply_map(&mut self, m: &BTreeMap) { for (k, v) in m { diff --git a/codex-rs/core/src/otel_init.rs b/codex-rs/core/src/otel_init.rs index ece5a6bf500..f9bf75e8acb 100644 --- a/codex-rs/core/src/otel_init.rs +++ b/codex-rs/core/src/otel_init.rs @@ -6,7 +6,7 @@ use codex_otel::config::OtelExporter; use codex_otel::config::OtelHttpProtocol; use codex_otel::config::OtelSettings; use codex_otel::config::OtelTlsConfig as OtelTlsSettings; -use codex_otel::otel_provider::OtelProvider; +use codex_otel::traces::otel_provider::OtelProvider; use std::error::Error; /// Build an OpenTelemetry provider from the app Config. @@ -18,6 +18,7 @@ pub fn build_provider( ) -> Result, Box> { let to_otel_exporter = |kind: &Kind| match kind { Kind::None => OtelExporter::None, + Kind::Statsig => OtelExporter::Statsig, Kind::OtlpHttp { endpoint, headers, @@ -63,6 +64,11 @@ pub fn build_provider( let exporter = to_otel_exporter(&config.otel.exporter); let trace_exporter = to_otel_exporter(&config.otel.trace_exporter); + let metrics_exporter = if config.analytics { + to_otel_exporter(&config.otel.metrics_exporter) + } else { + OtelExporter::None + }; OtelProvider::from(&OtelSettings { service_name: originator().value.to_owned(), @@ -71,6 +77,7 @@ pub fn build_provider( environment: config.otel.environment.to_string(), exporter, trace_exporter, + metrics_exporter, }) } diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index 942df35c02d..2e4395956a5 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -10,7 +10,7 @@ use crate::skills::SkillsManager; use crate::tools::sandboxing::ApprovalStore; use crate::unified_exec::UnifiedExecProcessManager; use crate::user_notification::UserNotifier; -use codex_otel::otel_manager::OtelManager; +use codex_otel::OtelManager; use tokio::sync::Mutex; use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; diff --git a/codex-rs/core/src/tasks/compact.rs b/codex-rs/core/src/tasks/compact.rs index a2a268fbb36..4b5f0d1cfb3 100644 --- a/codex-rs/core/src/tasks/compact.rs +++ b/codex-rs/core/src/tasks/compact.rs @@ -29,8 +29,16 @@ impl SessionTask for CompactTask { session.as_ref(), &ctx.client.get_provider(), ) { + let _ = session + .services + .otel_manager + .counter("codex.task.compact.remote", 1, &[]); crate::compact_remote::run_remote_compact_task(session, ctx).await } else { + let _ = session + .services + .otel_manager + .counter("codex.task.compact.local", 1, &[]); crate::compact::run_compact_task(session, ctx, input).await } diff --git a/codex-rs/core/src/tasks/review.rs b/codex-rs/core/src/tasks/review.rs index 4a2b587af48..193024b07e1 100644 --- a/codex-rs/core/src/tasks/review.rs +++ b/codex-rs/core/src/tasks/review.rs @@ -46,6 +46,12 @@ impl SessionTask for ReviewTask { input: Vec, cancellation_token: CancellationToken, ) -> Option { + let _ = session + .session + .services + .otel_manager + .counter("codex.task.review", 1, &[]); + // Start sub-codex conversation and get the receiver for events. let output = match start_review_conversation( session.clone(), diff --git a/codex-rs/core/src/tasks/undo.rs b/codex-rs/core/src/tasks/undo.rs index 5da7edd16fa..86232c094ce 100644 --- a/codex-rs/core/src/tasks/undo.rs +++ b/codex-rs/core/src/tasks/undo.rs @@ -38,6 +38,11 @@ impl SessionTask for UndoTask { _input: Vec, cancellation_token: CancellationToken, ) -> Option { + let _ = session + .session + .services + .otel_manager + .counter("codex.task.undo", 1, &[]); let sess = session.clone_session(); sess.send_event( ctx.as_ref(), diff --git a/codex-rs/core/src/tasks/user_shell.rs b/codex-rs/core/src/tasks/user_shell.rs index aec09514ca3..e76f70253ec 100644 --- a/codex-rs/core/src/tasks/user_shell.rs +++ b/codex-rs/core/src/tasks/user_shell.rs @@ -58,6 +58,12 @@ impl SessionTask for UserShellCommandTask { _input: Vec, cancellation_token: CancellationToken, ) -> Option { + let _ = session + .session + .services + .otel_manager + .counter("codex.task.user_shell", 1, &[]); + let event = EventMsg::TaskStarted(TaskStartedEvent { model_context_window: turn_context.client.get_model_context_window(), }); diff --git a/codex-rs/core/src/tools/orchestrator.rs b/codex-rs/core/src/tools/orchestrator.rs index 7853617238e..f0810916a55 100644 --- a/codex-rs/core/src/tools/orchestrator.rs +++ b/codex-rs/core/src/tools/orchestrator.rs @@ -17,6 +17,7 @@ use crate::tools::sandboxing::ToolCtx; use crate::tools::sandboxing::ToolError; use crate::tools::sandboxing::ToolRuntime; use crate::tools::sandboxing::default_exec_approval_requirement; +use codex_otel::ToolDecisionSource; use codex_protocol::protocol::AskForApproval; use codex_protocol::protocol::ReviewDecision; @@ -45,8 +46,8 @@ impl ToolOrchestrator { let otel = turn_ctx.client.get_otel_manager(); let otel_tn = &tool_ctx.tool_name; let otel_ci = &tool_ctx.call_id; - let otel_user = codex_otel::otel_manager::ToolDecisionSource::User; - let otel_cfg = codex_otel::otel_manager::ToolDecisionSource::Config; + let otel_user = ToolDecisionSource::User; + let otel_cfg = ToolDecisionSource::Config; // 1) Approval let mut already_approved = false; diff --git a/codex-rs/core/tests/chat_completions_payload.rs b/codex-rs/core/tests/chat_completions_payload.rs index 536de44e835..b4253e2d23e 100644 --- a/codex-rs/core/tests/chat_completions_payload.rs +++ b/codex-rs/core/tests/chat_completions_payload.rs @@ -13,7 +13,7 @@ use codex_core::Prompt; use codex_core::ResponseItem; use codex_core::WireApi; use codex_core::models_manager::manager::ModelsManager; -use codex_otel::otel_manager::OtelManager; +use codex_otel::OtelManager; use codex_protocol::ThreadId; use codex_protocol::models::ReasoningItemContent; use codex_protocol::protocol::SessionSource; diff --git a/codex-rs/core/tests/chat_completions_sse.rs b/codex-rs/core/tests/chat_completions_sse.rs index 17d4a5a4abc..cf05e68cfef 100644 --- a/codex-rs/core/tests/chat_completions_sse.rs +++ b/codex-rs/core/tests/chat_completions_sse.rs @@ -12,7 +12,7 @@ use codex_core::ResponseEvent; use codex_core::ResponseItem; use codex_core::WireApi; use codex_core::models_manager::manager::ModelsManager; -use codex_otel::otel_manager::OtelManager; +use codex_otel::OtelManager; use codex_protocol::ThreadId; use codex_protocol::models::ReasoningItemContent; use codex_protocol::protocol::SessionSource; diff --git a/codex-rs/core/tests/responses_headers.rs b/codex-rs/core/tests/responses_headers.rs index 188397fd24a..1546424dd8f 100644 --- a/codex-rs/core/tests/responses_headers.rs +++ b/codex-rs/core/tests/responses_headers.rs @@ -11,7 +11,7 @@ use codex_core::ResponseEvent; use codex_core::ResponseItem; use codex_core::WireApi; use codex_core::models_manager::manager::ModelsManager; -use codex_otel::otel_manager::OtelManager; +use codex_otel::OtelManager; use codex_protocol::ThreadId; use codex_protocol::config_types::ReasoningSummary; use codex_protocol::protocol::SessionSource; diff --git a/codex-rs/core/tests/suite/client.rs b/codex-rs/core/tests/suite/client.rs index 7d876ceb1f5..0423784a5ec 100644 --- a/codex-rs/core/tests/suite/client.rs +++ b/codex-rs/core/tests/suite/client.rs @@ -20,7 +20,7 @@ use codex_core::models_manager::manager::ModelsManager; use codex_core::protocol::EventMsg; use codex_core::protocol::Op; use codex_core::protocol::SessionSource; -use codex_otel::otel_manager::OtelManager; +use codex_otel::OtelManager; use codex_protocol::ThreadId; use codex_protocol::config_types::ReasoningSummary; use codex_protocol::config_types::Verbosity; diff --git a/codex-rs/otel/Cargo.toml b/codex-rs/otel/Cargo.toml index a703808e80b..eb19ec7df78 100644 --- a/codex-rs/otel/Cargo.toml +++ b/codex-rs/otel/Cargo.toml @@ -12,6 +12,13 @@ path = "src/lib.rs" [lints] workspace = true +[features] +## Disables the built-in default metrics exporter. +## +## Intended for use from `dev-dependencies` so unit/integration tests never +## attempt to export metrics over the network. +disable-default-metrics-exporter = [] + [dependencies] chrono = { workspace = true } codex-app-server-protocol = { workspace = true } @@ -19,13 +26,14 @@ codex-utils-absolute-path = { workspace = true } codex-api = { workspace = true } codex-protocol = { workspace = true } eventsource-stream = { workspace = true } -opentelemetry = { workspace = true, features = ["logs", "trace"] } +opentelemetry = { workspace = true, features = ["logs", "metrics", "trace"] } opentelemetry-appender-tracing = { workspace = true } opentelemetry-otlp = { workspace = true, features = [ "grpc-tonic", "http-proto", "http-json", "logs", + "metrics", "trace", "reqwest-blocking-client", "reqwest-rustls", @@ -33,16 +41,13 @@ opentelemetry-otlp = { workspace = true, features = [ "tls-roots", ]} opentelemetry-semantic-conventions = { workspace = true } -opentelemetry_sdk = { workspace = true, features = [ - "logs", - "rt-tokio", - "trace", -]} +opentelemetry_sdk = { workspace = true, features = ["logs", "metrics", "rt-tokio", "testing", "trace"] } http = { workspace = true } reqwest = { workspace = true, features = ["blocking", "rustls-tls"] } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } strum_macros = { workspace = true } +thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } tracing-opentelemetry = { workspace = true } @@ -50,3 +55,4 @@ tracing-subscriber = { workspace = true } [dev-dependencies] opentelemetry_sdk = { workspace = true, features = ["testing"] } +pretty_assertions = { workspace = true } diff --git a/codex-rs/otel/README.md b/codex-rs/otel/README.md new file mode 100644 index 00000000000..79f4e8f45c9 --- /dev/null +++ b/codex-rs/otel/README.md @@ -0,0 +1,128 @@ +# codex-otel + +`codex-otel` is the OpenTelemetry integration crate for Codex. It provides: + +- Trace/log exporters and tracing subscriber layers (`codex_otel::traces::otel_provider`). +- A structured event helper (`codex_otel::OtelManager`). +- OpenTelemetry metrics support via OTLP exporters (`codex_otel::metrics`). +- A metrics facade on `OtelManager` so tracing + metrics share metadata. + +## Tracing and logs + +Create an OTEL provider from `OtelSettings`, then attach its layers to your +`tracing_subscriber` registry: + +```rust +use codex_otel::config::OtelExporter; +use codex_otel::config::OtelHttpProtocol; +use codex_otel::config::OtelSettings; +use codex_otel::traces::otel_provider::OtelProvider; +use tracing_subscriber::prelude::*; + +let settings = OtelSettings { + environment: "dev".to_string(), + service_name: "codex-cli".to_string(), + service_version: env!("CARGO_PKG_VERSION").to_string(), + codex_home: std::path::PathBuf::from("/tmp"), + exporter: OtelExporter::OtlpHttp { + endpoint: "https://otlp.example.com".to_string(), + headers: std::collections::HashMap::new(), + protocol: OtelHttpProtocol::Binary, + tls: None, + }, + trace_exporter: OtelExporter::OtlpHttp { + endpoint: "https://otlp.example.com".to_string(), + headers: std::collections::HashMap::new(), + protocol: OtelHttpProtocol::Binary, + tls: None, + }, + metrics_exporter: OtelExporter::None, +}; + +if let Some(provider) = OtelProvider::from(&settings)? { + let registry = tracing_subscriber::registry() + .with(provider.logger_layer()) + .with(provider.tracing_layer()); + registry.init(); +} +``` + +## OtelManager (events) + +`OtelManager` adds consistent metadata to tracing events and helps record +Codex-specific events. + +```rust +use codex_otel::OtelManager; + +let manager = OtelManager::new( + conversation_id, + model, + slug, + account_id, + account_email, + auth_mode, + log_user_prompts, + terminal_type, + session_source, +); + +manager.user_prompt(&prompt_items); +``` + +## Metrics (OTLP or in-memory) + +Modes: + +- OTLP: exports metrics via the OpenTelemetry OTLP exporter (HTTP or gRPC). +- In-memory: records via `opentelemetry_sdk::metrics::InMemoryMetricExporter` for tests/assertions; call `shutdown()` to flush. + +`codex-otel` also provides `OtelExporter::Statsig`, a shorthand for exporting OTLP/HTTP JSON metrics +to Statsig using Codex-internal defaults. + +Statsig ingestion (OTLP/HTTP JSON) example: + +```rust +use codex_otel::config::{OtelExporter, OtelHttpProtocol}; + +let metrics = MetricsClient::new(MetricsConfig::otlp( + "dev", + "codex-cli", + env!("CARGO_PKG_VERSION"), + OtelExporter::OtlpHttp { + endpoint: "https://api.statsig.com/otlp".to_string(), + headers: std::collections::HashMap::from([( + "statsig-api-key".to_string(), + std::env::var("STATSIG_SERVER_SDK_SECRET")?, + )]), + protocol: OtelHttpProtocol::Json, + tls: None, + }, +))?; + +metrics.counter("codex.session_started", 1, &[("source", "tui")])?; +metrics.histogram("codex.request_latency", 83, &[("route", "chat")])?; +``` + +In-memory (tests): + +```rust +let exporter = InMemoryMetricExporter::default(); +let metrics = MetricsClient::new(MetricsConfig::in_memory( + "test", + "codex-cli", + env!("CARGO_PKG_VERSION"), + exporter.clone(), +))?; +metrics.counter("codex.turns", 1, &[("model", "gpt-5.1")])?; +metrics.shutdown()?; // flushes in-memory exporter +``` + +## Shutdown + +- `OtelProvider::shutdown()` stops the OTEL exporter. +- `OtelManager::shutdown_metrics()` flushes and shuts down the metrics provider. + +Both are optional because drop performs best-effort shutdown, but calling them +explicitly gives deterministic flushing (or a shutdown error if flushing does +not complete in time). diff --git a/codex-rs/otel/src/config.rs b/codex-rs/otel/src/config.rs index 935c0379fbe..f8f2d5a1063 100644 --- a/codex-rs/otel/src/config.rs +++ b/codex-rs/otel/src/config.rs @@ -3,6 +3,31 @@ use std::path::PathBuf; use codex_utils_absolute_path::AbsolutePathBuf; +pub(crate) const STATSIG_OTLP_HTTP_ENDPOINT: &str = "https://ab.chatgpt.com/otlp/v1/metrics"; +pub(crate) const STATSIG_API_KEY_HEADER: &str = "statsig-api-key"; +pub(crate) const STATSIG_API_KEY: &str = "client-MkRuleRQBd6qakfnDYqJVR9JuXcY57Ljly3vi5JVUIO"; + +pub(crate) fn resolve_exporter(exporter: &OtelExporter) -> OtelExporter { + match exporter { + OtelExporter::Statsig => { + if cfg!(test) || cfg!(feature = "disable-default-metrics-exporter") { + return OtelExporter::None; + } + + OtelExporter::OtlpHttp { + endpoint: STATSIG_OTLP_HTTP_ENDPOINT.to_string(), + headers: HashMap::from([( + STATSIG_API_KEY_HEADER.to_string(), + STATSIG_API_KEY.to_string(), + )]), + protocol: OtelHttpProtocol::Json, + tls: None, + } + } + _ => exporter.clone(), + } +} + #[derive(Clone, Debug)] pub struct OtelSettings { pub environment: String, @@ -11,6 +36,7 @@ pub struct OtelSettings { pub codex_home: PathBuf, pub exporter: OtelExporter, pub trace_exporter: OtelExporter, + pub metrics_exporter: OtelExporter, } #[derive(Clone, Debug)] @@ -31,6 +57,10 @@ pub struct OtelTlsConfig { #[derive(Clone, Debug)] pub enum OtelExporter { None, + /// Statsig metrics ingestion exporter using Codex-internal defaults. + /// + /// This is intended for metrics only. + Statsig, OtlpGrpc { endpoint: String, headers: HashMap, diff --git a/codex-rs/otel/src/lib.rs b/codex-rs/otel/src/lib.rs index 5211c8e89ba..25607623204 100644 --- a/codex-rs/otel/src/lib.rs +++ b/codex-rs/otel/src/lib.rs @@ -1,4 +1,173 @@ pub mod config; +pub mod metrics; +pub mod traces; -pub mod otel_manager; -pub mod otel_provider; +mod otlp; + +use crate::metrics::MetricsClient; +use crate::metrics::MetricsConfig; +use crate::metrics::MetricsError; +use crate::metrics::Result as MetricsResult; +use crate::metrics::timer::Timer; +use crate::metrics::validation::validate_tag_key; +use crate::metrics::validation::validate_tag_value; +use crate::traces::otel_provider::OtelProvider; +use codex_protocol::ThreadId; +use serde::Serialize; +use std::time::Duration; +use strum_macros::Display; +use tracing::Span; + +#[derive(Debug, Clone, Serialize, Display)] +#[serde(rename_all = "snake_case")] +pub enum ToolDecisionSource { + Config, + User, +} + +#[derive(Debug, Clone)] +pub struct OtelEventMetadata { + pub(crate) conversation_id: ThreadId, + pub(crate) auth_mode: Option, + pub(crate) account_id: Option, + pub(crate) account_email: Option, + pub(crate) model: String, + pub(crate) slug: String, + pub(crate) log_user_prompts: bool, + pub(crate) app_version: &'static str, + pub(crate) terminal_type: String, +} + +#[derive(Debug, Clone)] +pub struct OtelManager { + pub(crate) metadata: OtelEventMetadata, + pub(crate) session_span: Span, + pub(crate) metrics: Option, + pub(crate) metrics_use_metadata_tags: bool, +} + +impl OtelManager { + pub fn with_model(mut self, model: &str, slug: &str) -> Self { + self.metadata.model = model.to_owned(); + self.metadata.slug = slug.to_owned(); + self + } + + pub fn with_metrics(mut self, metrics: MetricsClient) -> Self { + self.metrics = Some(metrics); + self.metrics_use_metadata_tags = true; + self + } + + pub fn with_metrics_without_metadata_tags(mut self, metrics: MetricsClient) -> Self { + self.metrics = Some(metrics); + self.metrics_use_metadata_tags = false; + self + } + + pub fn with_metrics_config(self, config: MetricsConfig) -> MetricsResult { + let metrics = MetricsClient::new(config)?; + Ok(self.with_metrics(metrics)) + } + + pub fn with_provider_metrics(self, provider: &OtelProvider) -> Self { + match provider.metrics() { + Some(metrics) => self.with_metrics(metrics.clone()), + None => self, + } + } + + pub fn counter(&self, name: &str, inc: i64, tags: &[(&str, &str)]) { + let res: MetricsResult<()> = (|| { + let Some(metrics) = &self.metrics else { + return Ok(()); + }; + + let tags = self.tags_with_metadata(tags)?; + metrics.counter(name, inc, &tags) + })(); + + if let Err(e) = res { + tracing::warn!("metrics counter [{name}] failed: {e}"); + } + } + + pub fn histogram(&self, name: &str, value: i64, tags: &[(&str, &str)]) { + let res: MetricsResult<()> = (|| { + let Some(metrics) = &self.metrics else { + return Ok(()); + }; + + let tags = self.tags_with_metadata(tags)?; + metrics.histogram(name, value, &tags) + })(); + + if let Err(e) = res { + tracing::warn!("metrics histogram [{name}] failed: {e}"); + } + } + + pub fn record_duration(&self, name: &str, duration: Duration, tags: &[(&str, &str)]) { + let res: MetricsResult<()> = (|| { + let Some(metrics) = &self.metrics else { + return Ok(()); + }; + + let tags = self.tags_with_metadata(tags)?; + metrics.record_duration(name, duration, &tags) + })(); + + if let Err(e) = res { + tracing::warn!("metrics duration [{name}] failed: {e}"); + } + } + + pub fn start_timer(&self, name: &str, tags: &[(&str, &str)]) -> Result { + let Some(metrics) = &self.metrics else { + return Err(MetricsError::ExporterDisabled); + }; + let tags = self.tags_with_metadata(tags)?; + metrics.start_timer(name, &tags) + } + + pub fn shutdown_metrics(&self) -> MetricsResult<()> { + let Some(metrics) = &self.metrics else { + return Ok(()); + }; + metrics.shutdown() + } + + fn tags_with_metadata<'a>( + &'a self, + tags: &'a [(&'a str, &'a str)], + ) -> MetricsResult> { + let mut merged = self.metadata_tag_refs()?; + merged.extend(tags.iter().copied()); + Ok(merged) + } + + fn metadata_tag_refs(&self) -> MetricsResult> { + if !self.metrics_use_metadata_tags { + return Ok(Vec::new()); + } + let mut tags = Vec::with_capacity(5); + Self::push_metadata_tag(&mut tags, "auth_mode", self.metadata.auth_mode.as_deref())?; + Self::push_metadata_tag(&mut tags, "model", Some(self.metadata.model.as_str()))?; + Self::push_metadata_tag(&mut tags, "app.version", Some(self.metadata.app_version))?; + Ok(tags) + } + + fn push_metadata_tag<'a>( + tags: &mut Vec<(&'a str, &'a str)>, + key: &'static str, + value: Option<&'a str>, + ) -> MetricsResult<()> { + let Some(value) = value else { + return Ok(()); + }; + validate_tag_key(key)?; + validate_tag_value(value)?; + tags.push((key, value)); + Ok(()) + } +} diff --git a/codex-rs/otel/src/metrics/client.rs b/codex-rs/otel/src/metrics/client.rs new file mode 100644 index 00000000000..362199d6989 --- /dev/null +++ b/codex-rs/otel/src/metrics/client.rs @@ -0,0 +1,291 @@ +use crate::config::OtelExporter; +use crate::config::OtelHttpProtocol; +use crate::metrics::MetricsError; +use crate::metrics::Result; +use crate::metrics::config::MetricsConfig; +use crate::metrics::config::MetricsExporter; +use crate::metrics::timer::Timer; +use crate::metrics::validation::validate_metric_name; +use crate::metrics::validation::validate_tag_key; +use crate::metrics::validation::validate_tag_value; +use crate::metrics::validation::validate_tags; +use opentelemetry::KeyValue; +use opentelemetry::metrics::Counter; +use opentelemetry::metrics::Histogram; +use opentelemetry::metrics::Meter; +use opentelemetry::metrics::MeterProvider as _; +use opentelemetry_otlp::OTEL_EXPORTER_OTLP_METRICS_TIMEOUT; +use opentelemetry_otlp::Protocol; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_otlp::WithHttpConfig; +use opentelemetry_otlp::WithTonicConfig; +use opentelemetry_otlp::tonic_types::metadata::MetadataMap; +use opentelemetry_otlp::tonic_types::transport::ClientTlsConfig; +use opentelemetry_sdk::Resource; +use opentelemetry_sdk::metrics::PeriodicReader; +use opentelemetry_sdk::metrics::SdkMeterProvider; +use opentelemetry_sdk::metrics::Temporality; +use opentelemetry_semantic_conventions as semconv; +use std::collections::BTreeMap; +use std::collections::HashMap; +use std::sync::Mutex; +use std::time::Duration; +use tracing::debug; + +const ENV_ATTRIBUTE: &str = "env"; +const METER_NAME: &str = "codex"; + +#[derive(Debug)] +struct MetricsClientInner { + meter_provider: SdkMeterProvider, + meter: Meter, + counters: Mutex>>, + histograms: Mutex>>, + default_tags: BTreeMap, +} + +impl MetricsClientInner { + fn counter(&self, name: &str, inc: i64, tags: &[(&str, &str)]) -> Result<()> { + validate_metric_name(name)?; + if inc < 0 { + return Err(MetricsError::NegativeCounterIncrement { + name: name.to_string(), + inc, + }); + } + let attributes = self.attributes(tags)?; + + let mut counters = self + .counters + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let counter = counters + .entry(name.to_string()) + .or_insert_with(|| self.meter.u64_counter(name.to_string()).build()); + counter.add(inc as u64, &attributes); + Ok(()) + } + + fn histogram(&self, name: &str, value: i64, tags: &[(&str, &str)]) -> Result<()> { + validate_metric_name(name)?; + let attributes = self.attributes(tags)?; + + let mut histograms = self + .histograms + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let histogram = histograms + .entry(name.to_string()) + .or_insert_with(|| self.meter.f64_histogram(name.to_string()).build()); + histogram.record(value as f64, &attributes); + Ok(()) + } + + fn attributes(&self, tags: &[(&str, &str)]) -> Result> { + if tags.is_empty() { + return Ok(self + .default_tags + .iter() + .map(|(key, value)| KeyValue::new(key.clone(), value.clone())) + .collect()); + } + + let mut merged = self.default_tags.clone(); + for (key, value) in tags { + validate_tag_key(key)?; + validate_tag_value(value)?; + merged.insert((*key).to_string(), (*value).to_string()); + } + + Ok(merged + .into_iter() + .map(|(key, value)| KeyValue::new(key, value)) + .collect()) + } + + fn shutdown(&self) -> Result<()> { + debug!("flushing OTEL metrics"); + self.meter_provider + .force_flush() + .map_err(|source| MetricsError::ProviderShutdown { source })?; + self.meter_provider + .shutdown() + .map_err(|source| MetricsError::ProviderShutdown { source })?; + Ok(()) + } +} + +/// OpenTelemetry metrics client used by Codex. +#[derive(Clone, Debug)] +pub struct MetricsClient(std::sync::Arc); + +impl MetricsClient { + /// Build a metrics client from configuration and validate defaults. + pub fn new(config: MetricsConfig) -> Result { + validate_tags(&config.default_tags)?; + + let resource = Resource::builder() + .with_service_name(config.service_name.clone()) + .with_attributes(vec![ + KeyValue::new( + semconv::attribute::SERVICE_VERSION, + config.service_version.clone(), + ), + KeyValue::new(ENV_ATTRIBUTE, config.environment.clone()), + ]) + .build(); + + let temporality = Temporality::default(); + let (meter_provider, meter) = match config.exporter { + MetricsExporter::InMemory(exporter) => { + build_provider(resource, exporter, config.export_interval) + } + MetricsExporter::Otlp(exporter) => { + let exporter = build_otlp_metric_exporter(exporter, temporality)?; + build_provider(resource, exporter, config.export_interval) + } + }; + + Ok(Self(std::sync::Arc::new(MetricsClientInner { + meter_provider, + meter, + counters: Mutex::new(HashMap::new()), + histograms: Mutex::new(HashMap::new()), + default_tags: config.default_tags, + }))) + } + + /// Send a single counter increment. + pub fn counter(&self, name: &str, inc: i64, tags: &[(&str, &str)]) -> Result<()> { + self.0.counter(name, inc, tags) + } + + /// Send a single histogram sample. + pub fn histogram(&self, name: &str, value: i64, tags: &[(&str, &str)]) -> Result<()> { + self.0.histogram(name, value, tags) + } + + /// Record a duration in milliseconds using a histogram. + pub fn record_duration( + &self, + name: &str, + duration: Duration, + tags: &[(&str, &str)], + ) -> Result<()> { + self.histogram( + name, + duration.as_millis().min(i64::MAX as u128) as i64, + tags, + ) + } + + pub fn start_timer( + &self, + name: &str, + tags: &[(&str, &str)], + ) -> std::result::Result { + Ok(Timer::new(name, tags, self)) + } + + /// Flush metrics and stop the underlying OTEL meter provider. + pub fn shutdown(&self) -> Result<()> { + self.0.shutdown() + } +} + +fn build_provider( + resource: Resource, + exporter: E, + interval: Option, +) -> (SdkMeterProvider, Meter) +where + E: opentelemetry_sdk::metrics::exporter::PushMetricExporter + 'static, +{ + let mut reader_builder = PeriodicReader::builder(exporter); + if let Some(interval) = interval { + reader_builder = reader_builder.with_interval(interval); + } + let reader = reader_builder.build(); + let provider = SdkMeterProvider::builder() + .with_resource(resource) + .with_reader(reader) + .build(); + let meter = provider.meter(METER_NAME); + (provider, meter) +} + +fn build_otlp_metric_exporter( + exporter: OtelExporter, + temporality: Temporality, +) -> Result { + match exporter { + OtelExporter::None => Err(MetricsError::ExporterDisabled), + OtelExporter::Statsig => build_otlp_metric_exporter( + crate::config::resolve_exporter(&OtelExporter::Statsig), + temporality, + ), + OtelExporter::OtlpGrpc { + endpoint, + headers, + tls, + } => { + debug!("Using OTLP Grpc exporter for metrics: {endpoint}"); + + let header_map = crate::otlp::build_header_map(&headers); + + let base_tls_config = ClientTlsConfig::new() + .with_enabled_roots() + .assume_http2(true); + + let tls_config = match tls.as_ref() { + Some(tls) => crate::otlp::build_grpc_tls_config(&endpoint, base_tls_config, tls) + .map_err(|err| MetricsError::InvalidConfig { + message: err.to_string(), + })?, + None => base_tls_config, + }; + + opentelemetry_otlp::MetricExporter::builder() + .with_tonic() + .with_endpoint(endpoint) + .with_temporality(temporality) + .with_metadata(MetadataMap::from_headers(header_map)) + .with_tls_config(tls_config) + .build() + .map_err(|source| MetricsError::ExporterBuild { source }) + } + OtelExporter::OtlpHttp { + endpoint, + headers, + protocol, + tls, + } => { + debug!("Using OTLP Http exporter for metrics: {endpoint}"); + + let protocol = match protocol { + OtelHttpProtocol::Binary => Protocol::HttpBinary, + OtelHttpProtocol::Json => Protocol::HttpJson, + }; + + let mut exporter_builder = opentelemetry_otlp::MetricExporter::builder() + .with_http() + .with_endpoint(endpoint) + .with_temporality(temporality) + .with_protocol(protocol) + .with_headers(headers); + + if let Some(tls) = tls.as_ref() { + let client = + crate::otlp::build_http_client(tls, OTEL_EXPORTER_OTLP_METRICS_TIMEOUT) + .map_err(|err| MetricsError::InvalidConfig { + message: err.to_string(), + })?; + exporter_builder = exporter_builder.with_http_client(client); + } + + exporter_builder + .build() + .map_err(|source| MetricsError::ExporterBuild { source }) + } + } +} diff --git a/codex-rs/otel/src/metrics/config.rs b/codex-rs/otel/src/metrics/config.rs new file mode 100644 index 00000000000..c7a459183be --- /dev/null +++ b/codex-rs/otel/src/metrics/config.rs @@ -0,0 +1,74 @@ +use crate::config::OtelExporter; +use crate::metrics::Result; +use crate::metrics::validation::validate_tag_key; +use crate::metrics::validation::validate_tag_value; +use opentelemetry_sdk::metrics::InMemoryMetricExporter; +use std::collections::BTreeMap; +use std::time::Duration; + +#[derive(Clone, Debug)] +pub enum MetricsExporter { + Otlp(OtelExporter), + InMemory(InMemoryMetricExporter), +} + +#[derive(Clone, Debug)] +pub struct MetricsConfig { + pub(crate) environment: String, + pub(crate) service_name: String, + pub(crate) service_version: String, + pub(crate) exporter: MetricsExporter, + pub(crate) export_interval: Option, + pub(crate) default_tags: BTreeMap, +} + +impl MetricsConfig { + pub fn otlp( + environment: impl Into, + service_name: impl Into, + service_version: impl Into, + exporter: OtelExporter, + ) -> Self { + Self { + environment: environment.into(), + service_name: service_name.into(), + service_version: service_version.into(), + exporter: MetricsExporter::Otlp(exporter), + export_interval: None, + default_tags: BTreeMap::new(), + } + } + + /// Create an in-memory config (used in tests). + pub fn in_memory( + environment: impl Into, + service_name: impl Into, + service_version: impl Into, + exporter: InMemoryMetricExporter, + ) -> Self { + Self { + environment: environment.into(), + service_name: service_name.into(), + service_version: service_version.into(), + exporter: MetricsExporter::InMemory(exporter), + export_interval: None, + default_tags: BTreeMap::new(), + } + } + + /// Override the interval between periodic metric exports. + pub fn with_export_interval(mut self, interval: Duration) -> Self { + self.export_interval = Some(interval); + self + } + + /// Add a default tag that will be sent with every metric. + pub fn with_tag(mut self, key: impl Into, value: impl Into) -> Result { + let key = key.into(); + let value = value.into(); + validate_tag_key(&key)?; + validate_tag_value(&value)?; + self.default_tags.insert(key, value); + Ok(self) + } +} diff --git a/codex-rs/otel/src/metrics/error.rs b/codex-rs/otel/src/metrics/error.rs new file mode 100644 index 00000000000..dfb9653254a --- /dev/null +++ b/codex-rs/otel/src/metrics/error.rs @@ -0,0 +1,37 @@ +use thiserror::Error; + +pub type Result = std::result::Result; + +#[derive(Debug, Error)] +pub enum MetricsError { + // Metrics. + #[error("metric name cannot be empty")] + EmptyMetricName, + #[error("metric name contains invalid characters: {name}")] + InvalidMetricName { name: String }, + #[error("{label} cannot be empty")] + EmptyTagComponent { label: String }, + #[error("{label} contains invalid characters: {value}")] + InvalidTagComponent { label: String, value: String }, + + #[error("metrics exporter is disabled")] + ExporterDisabled, + + #[error("counter increment must be non-negative for {name}: {inc}")] + NegativeCounterIncrement { name: String, inc: i64 }, + + #[error("failed to build OTLP metrics exporter")] + ExporterBuild { + #[source] + source: opentelemetry_otlp::ExporterBuildError, + }, + + #[error("invalid OTLP metrics configuration: {message}")] + InvalidConfig { message: String }, + + #[error("failed to flush or shutdown metrics provider")] + ProviderShutdown { + #[source] + source: opentelemetry_sdk::error::OTelSdkError, + }, +} diff --git a/codex-rs/otel/src/metrics/mod.rs b/codex-rs/otel/src/metrics/mod.rs new file mode 100644 index 00000000000..b13d5f917e3 --- /dev/null +++ b/codex-rs/otel/src/metrics/mod.rs @@ -0,0 +1,22 @@ +mod client; +mod config; +mod error; +pub(crate) mod timer; +pub(crate) mod validation; + +pub use crate::metrics::client::MetricsClient; +pub use crate::metrics::config::MetricsConfig; +pub use crate::metrics::config::MetricsExporter; +pub use crate::metrics::error::MetricsError; +pub use crate::metrics::error::Result; +use std::sync::OnceLock; + +static GLOBAL_METRICS: OnceLock = OnceLock::new(); + +pub(crate) fn install_global(metrics: MetricsClient) { + let _ = GLOBAL_METRICS.set(metrics); +} + +pub(crate) fn global() -> Option { + GLOBAL_METRICS.get().cloned() +} diff --git a/codex-rs/otel/src/metrics/timer.rs b/codex-rs/otel/src/metrics/timer.rs new file mode 100644 index 00000000000..b1624fda163 --- /dev/null +++ b/codex-rs/otel/src/metrics/timer.rs @@ -0,0 +1,42 @@ +use crate::metrics::MetricsClient; +use crate::metrics::error::Result; +use std::time::Instant; + +pub struct Timer { + name: String, + tags: Vec<(String, String)>, + client: MetricsClient, + start_time: Instant, +} + +impl Drop for Timer { + fn drop(&mut self) { + if let Err(e) = self.record() { + tracing::error!("metrics client error: {}", e); + } + } +} + +impl Timer { + pub(crate) fn new(name: &str, tags: &[(&str, &str)], client: &MetricsClient) -> Self { + Self { + name: name.to_string(), + tags: tags + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + client: client.clone(), + start_time: Instant::now(), + } + } + + pub fn record(&self) -> Result<()> { + let tags = self + .tags + .iter() + .map(|(k, v)| (k.as_str(), v.as_str())) + .collect::>(); + self.client + .record_duration(&self.name, self.start_time.elapsed(), &tags) + } +} diff --git a/codex-rs/otel/src/metrics/validation.rs b/codex-rs/otel/src/metrics/validation.rs new file mode 100644 index 00000000000..ce9e436d8f5 --- /dev/null +++ b/codex-rs/otel/src/metrics/validation.rs @@ -0,0 +1,55 @@ +use crate::metrics::error::MetricsError; +use crate::metrics::error::Result; +use std::collections::BTreeMap; + +pub(crate) fn validate_tags(tags: &BTreeMap) -> Result<()> { + for (key, value) in tags { + validate_tag_key(key)?; + validate_tag_value(value)?; + } + Ok(()) +} + +pub(crate) fn validate_metric_name(name: &str) -> Result<()> { + if name.is_empty() { + return Err(MetricsError::EmptyMetricName); + } + if !name.chars().all(is_metric_char) { + return Err(MetricsError::InvalidMetricName { + name: name.to_string(), + }); + } + Ok(()) +} + +pub(crate) fn validate_tag_key(key: &str) -> Result<()> { + validate_tag_component(key, "tag key")?; + Ok(()) +} + +pub(crate) fn validate_tag_value(value: &str) -> Result<()> { + validate_tag_component(value, "tag value") +} + +fn validate_tag_component(value: &str, label: &str) -> Result<()> { + if value.is_empty() { + return Err(MetricsError::EmptyTagComponent { + label: label.to_string(), + }); + } + if !value.chars().all(is_tag_char) { + return Err(MetricsError::InvalidTagComponent { + label: label.to_string(), + value: value.to_string(), + }); + } + Ok(()) +} + +fn is_metric_char(c: char) -> bool { + c.is_ascii_alphanumeric() || matches!(c, '.' | '_' | '-') +} + +fn is_tag_char(c: char) -> bool { + c.is_ascii_alphanumeric() || matches!(c, '.' | '_' | '-' | '/') +} diff --git a/codex-rs/otel/src/otlp.rs b/codex-rs/otel/src/otlp.rs new file mode 100644 index 00000000000..c70e5e55e9e --- /dev/null +++ b/codex-rs/otel/src/otlp.rs @@ -0,0 +1,163 @@ +use crate::config::OtelTlsConfig; +use codex_utils_absolute_path::AbsolutePathBuf; +use http::Uri; +use opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT; +use opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT; +use opentelemetry_otlp::tonic_types::transport::Certificate as TonicCertificate; +use opentelemetry_otlp::tonic_types::transport::ClientTlsConfig; +use opentelemetry_otlp::tonic_types::transport::Identity as TonicIdentity; +use reqwest::Certificate as ReqwestCertificate; +use reqwest::Identity as ReqwestIdentity; +use reqwest::header::HeaderMap; +use reqwest::header::HeaderName; +use reqwest::header::HeaderValue; +use std::env; +use std::error::Error; +use std::fs; +use std::io; +use std::io::ErrorKind; +use std::path::PathBuf; +use std::time::Duration; + +pub(crate) fn build_header_map(headers: &std::collections::HashMap) -> HeaderMap { + let mut header_map = HeaderMap::new(); + for (key, value) in headers { + if let Ok(name) = HeaderName::from_bytes(key.as_bytes()) + && let Ok(val) = HeaderValue::from_str(value) + { + header_map.insert(name, val); + } + } + header_map +} + +pub(crate) fn build_grpc_tls_config( + endpoint: &str, + tls_config: ClientTlsConfig, + tls: &OtelTlsConfig, +) -> Result> { + let uri: Uri = endpoint.parse()?; + let host = uri.host().ok_or_else(|| { + config_error(format!( + "OTLP gRPC endpoint {endpoint} does not include a host" + )) + })?; + + let mut config = tls_config.domain_name(host.to_owned()); + + if let Some(path) = tls.ca_certificate.as_ref() { + let (pem, _) = read_bytes(path)?; + config = config.ca_certificate(TonicCertificate::from_pem(pem)); + } + + match (&tls.client_certificate, &tls.client_private_key) { + (Some(cert_path), Some(key_path)) => { + let (cert_pem, _) = read_bytes(cert_path)?; + let (key_pem, _) = read_bytes(key_path)?; + config = config.identity(TonicIdentity::from_pem(cert_pem, key_pem)); + } + (Some(_), None) | (None, Some(_)) => { + return Err(config_error( + "client_certificate and client_private_key must both be provided for mTLS", + )); + } + (None, None) => {} + } + + Ok(config) +} + +/// Build a blocking HTTP client with TLS configuration for OTLP HTTP exporters. +/// +/// We use `reqwest::blocking::Client` because OTEL exporters run on dedicated +/// OS threads that are not necessarily backed by tokio. +pub(crate) fn build_http_client( + tls: &OtelTlsConfig, + timeout_var: &str, +) -> Result> { + if tokio::runtime::Handle::try_current().is_ok() { + tokio::task::block_in_place(|| build_http_client_inner(tls, timeout_var)) + } else { + build_http_client_inner(tls, timeout_var) + } +} + +fn build_http_client_inner( + tls: &OtelTlsConfig, + timeout_var: &str, +) -> Result> { + let mut builder = + reqwest::blocking::Client::builder().timeout(resolve_otlp_timeout(timeout_var)); + + if let Some(path) = tls.ca_certificate.as_ref() { + let (pem, location) = read_bytes(path)?; + let certificate = ReqwestCertificate::from_pem(pem.as_slice()).map_err(|error| { + config_error(format!( + "failed to parse certificate {}: {error}", + location.display() + )) + })?; + builder = builder + .tls_built_in_root_certs(false) + .add_root_certificate(certificate); + } + + match (&tls.client_certificate, &tls.client_private_key) { + (Some(cert_path), Some(key_path)) => { + let (mut cert_pem, cert_location) = read_bytes(cert_path)?; + let (key_pem, key_location) = read_bytes(key_path)?; + cert_pem.extend_from_slice(key_pem.as_slice()); + let identity = ReqwestIdentity::from_pem(cert_pem.as_slice()).map_err(|error| { + config_error(format!( + "failed to parse client identity using {} and {}: {error}", + cert_location.display(), + key_location.display() + )) + })?; + builder = builder.identity(identity).https_only(true); + } + (Some(_), None) | (None, Some(_)) => { + return Err(config_error( + "client_certificate and client_private_key must both be provided for mTLS", + )); + } + (None, None) => {} + } + + builder + .build() + .map_err(|error| Box::new(error) as Box) +} + +pub(crate) fn resolve_otlp_timeout(signal_var: &str) -> Duration { + if let Some(timeout) = read_timeout_env(signal_var) { + return timeout; + } + if let Some(timeout) = read_timeout_env(OTEL_EXPORTER_OTLP_TIMEOUT) { + return timeout; + } + OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT +} + +fn read_timeout_env(var: &str) -> Option { + let value = env::var(var).ok()?; + let parsed = value.parse::().ok()?; + if parsed < 0 { + return None; + } + Some(Duration::from_millis(parsed as u64)) +} + +fn read_bytes(path: &AbsolutePathBuf) -> Result<(Vec, PathBuf), Box> { + match fs::read(path) { + Ok(bytes) => Ok((bytes, path.to_path_buf())), + Err(error) => Err(Box::new(io::Error::new( + error.kind(), + format!("failed to read {}: {error}", path.display()), + ))), + } +} + +fn config_error(message: impl Into) -> Box { + Box::new(io::Error::new(ErrorKind::InvalidData, message.into())) +} diff --git a/codex-rs/otel/src/traces/mod.rs b/codex-rs/otel/src/traces/mod.rs new file mode 100644 index 00000000000..a58949f9ab3 --- /dev/null +++ b/codex-rs/otel/src/traces/mod.rs @@ -0,0 +1,2 @@ +pub mod otel_manager; +pub mod otel_provider; diff --git a/codex-rs/otel/src/otel_manager.rs b/codex-rs/otel/src/traces/otel_manager.rs similarity index 95% rename from codex-rs/otel/src/otel_manager.rs rename to codex-rs/otel/src/traces/otel_manager.rs index c59d3a168f6..f431f1a527f 100644 --- a/codex-rs/otel/src/otel_manager.rs +++ b/codex-rs/otel/src/traces/otel_manager.rs @@ -1,4 +1,4 @@ -use crate::otel_provider::traceparent_context_from_env; +use crate::traces::otel_provider::traceparent_context_from_env; use chrono::SecondsFormat; use chrono::Utc; use codex_api::ResponseEvent; @@ -16,43 +16,19 @@ use eventsource_stream::Event as StreamEvent; use eventsource_stream::EventStreamError as StreamError; use reqwest::Error; use reqwest::Response; -use serde::Serialize; use std::borrow::Cow; use std::fmt::Display; use std::future::Future; use std::time::Duration; use std::time::Instant; -use strum_macros::Display; use tokio::time::error::Elapsed; use tracing::Span; use tracing::trace_span; use tracing_opentelemetry::OpenTelemetrySpanExt; -#[derive(Debug, Clone, Serialize, Display)] -#[serde(rename_all = "snake_case")] -pub enum ToolDecisionSource { - Config, - User, -} - -#[derive(Debug, Clone)] -pub struct OtelEventMetadata { - conversation_id: ThreadId, - auth_mode: Option, - account_id: Option, - account_email: Option, - model: String, - slug: String, - log_user_prompts: bool, - app_version: &'static str, - terminal_type: String, -} - -#[derive(Debug, Clone)] -pub struct OtelManager { - metadata: OtelEventMetadata, - session_span: Span, -} +pub use crate::OtelEventMetadata; +pub use crate::OtelManager; +pub use crate::ToolDecisionSource; impl OtelManager { #[allow(clippy::too_many_arguments)] @@ -86,16 +62,11 @@ impl OtelManager { terminal_type, }, session_span, + metrics: crate::metrics::global(), + metrics_use_metadata_tags: true, } } - pub fn with_model(&self, model: &str, slug: &str) -> Self { - let mut manager = self.clone(); - manager.metadata.model = model.to_owned(); - manager.metadata.slug = slug.to_owned(); - manager - } - pub fn current_span(&self) -> &Span { &self.session_span } @@ -162,7 +133,7 @@ impl OtelManager { F: FnOnce() -> Fut, Fut: Future>, { - let start = std::time::Instant::now(); + let start = Instant::now(); let response = f().await; let duration = start.elapsed(); diff --git a/codex-rs/otel/src/otel_provider.rs b/codex-rs/otel/src/traces/otel_provider.rs similarity index 63% rename from codex-rs/otel/src/otel_provider.rs rename to codex-rs/otel/src/traces/otel_provider.rs index 8a777e7fdd8..b6a542d4bda 100644 --- a/codex-rs/otel/src/otel_provider.rs +++ b/codex-rs/otel/src/traces/otel_provider.rs @@ -1,9 +1,8 @@ use crate::config::OtelExporter; use crate::config::OtelHttpProtocol; use crate::config::OtelSettings; -use crate::config::OtelTlsConfig; -use codex_utils_absolute_path::AbsolutePathBuf; -use http::Uri; +use crate::metrics::MetricsClient; +use crate::metrics::MetricsConfig; use opentelemetry::Context; use opentelemetry::KeyValue; use opentelemetry::context::ContextGuard; @@ -14,8 +13,6 @@ use opentelemetry::trace::TracerProvider as _; use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; use opentelemetry_otlp::LogExporter; use opentelemetry_otlp::OTEL_EXPORTER_OTLP_LOGS_TIMEOUT; -use opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT; -use opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT; use opentelemetry_otlp::OTEL_EXPORTER_OTLP_TRACES_TIMEOUT; use opentelemetry_otlp::Protocol; use opentelemetry_otlp::SpanExporter; @@ -23,9 +20,7 @@ use opentelemetry_otlp::WithExportConfig; use opentelemetry_otlp::WithHttpConfig; use opentelemetry_otlp::WithTonicConfig; use opentelemetry_otlp::tonic_types::metadata::MetadataMap; -use opentelemetry_otlp::tonic_types::transport::Certificate as TonicCertificate; use opentelemetry_otlp::tonic_types::transport::ClientTlsConfig; -use opentelemetry_otlp::tonic_types::transport::Identity as TonicIdentity; use opentelemetry_sdk::Resource; use opentelemetry_sdk::logs::SdkLoggerProvider; use opentelemetry_sdk::propagation::TraceContextPropagator; @@ -33,21 +28,11 @@ use opentelemetry_sdk::trace::BatchSpanProcessor; use opentelemetry_sdk::trace::SdkTracerProvider; use opentelemetry_sdk::trace::Tracer; use opentelemetry_semantic_conventions as semconv; -use reqwest::Certificate as ReqwestCertificate; -use reqwest::Identity as ReqwestIdentity; -use reqwest::header::HeaderMap; -use reqwest::header::HeaderName; -use reqwest::header::HeaderValue; use std::cell::RefCell; use std::collections::HashMap; use std::env; use std::error::Error; -use std::fs; -use std::io::ErrorKind; -use std::io::{self}; -use std::path::PathBuf; use std::sync::OnceLock; -use std::time::Duration; use tracing::debug; use tracing::level_filters::LevelFilter; use tracing::warn; @@ -63,10 +48,12 @@ thread_local! { static TRACEPARENT_GUARD: RefCell> = const { RefCell::new(None) }; } +// TODO(jif) move OtelProvider out of `traces/` pub struct OtelProvider { pub logger: Option, pub tracer_provider: Option, pub tracer: Option, + pub metrics: Option, } impl OtelProvider { @@ -77,14 +64,33 @@ impl OtelProvider { if let Some(tracer_provider) = &self.tracer_provider { let _ = tracer_provider.shutdown(); } + if let Some(metrics) = &self.metrics { + let _ = metrics.shutdown(); + } } pub fn from(settings: &OtelSettings) -> Result, Box> { let log_enabled = !matches!(settings.exporter, OtelExporter::None); let trace_enabled = !matches!(settings.trace_exporter, OtelExporter::None); - if !log_enabled && !trace_enabled { - debug!("No exporter enabled in OTLP settings."); + let metric_exporter = crate::config::resolve_exporter(&settings.metrics_exporter); + let metrics = if matches!(metric_exporter, OtelExporter::None) { + None + } else { + Some(MetricsClient::new(MetricsConfig::otlp( + settings.environment.clone(), + settings.service_name.clone(), + settings.service_version.clone(), + metric_exporter, + ))?) + }; + + if let Some(metrics) = metrics.as_ref() { + crate::metrics::install_global(metrics.clone()); + } + + if !log_enabled && !trace_enabled && metrics.is_none() { + debug!("No OTEL exporter enabled in settings."); return Ok(None); } @@ -113,6 +119,7 @@ impl OtelProvider { logger, tracer_provider, tracer, + metrics, })) } @@ -141,6 +148,10 @@ impl OtelProvider { pub fn codex_export_filter(meta: &tracing::Metadata<'_>) -> bool { meta.target().starts_with("codex_otel") } + + pub fn metrics(&self) -> Option<&MetricsClient> { + self.metrics.as_ref() + } } impl Drop for OtelProvider { @@ -151,6 +162,9 @@ impl Drop for OtelProvider { if let Some(tracer_provider) = &self.tracer_provider { let _ = tracer_provider.shutdown(); } + if let Some(metrics) = &self.metrics { + let _ = metrics.shutdown(); + } } } @@ -223,8 +237,9 @@ fn build_logger( ) -> Result> { let mut builder = SdkLoggerProvider::builder().with_resource(resource.clone()); - match exporter { + match crate::config::resolve_exporter(exporter) { OtelExporter::None => return Ok(builder.build()), + OtelExporter::Statsig => unreachable!("statsig exporter should be resolved"), OtelExporter::OtlpGrpc { endpoint, headers, @@ -232,14 +247,14 @@ fn build_logger( } => { debug!("Using OTLP Grpc exporter: {endpoint}"); - let header_map = build_header_map(headers); + let header_map = crate::otlp::build_header_map(&headers); let base_tls_config = ClientTlsConfig::new() .with_enabled_roots() .assume_http2(true); let tls_config = match tls.as_ref() { - Some(tls) => build_grpc_tls_config(endpoint, base_tls_config, tls)?, + Some(tls) => crate::otlp::build_grpc_tls_config(&endpoint, base_tls_config, tls)?, None => base_tls_config, }; @@ -269,10 +284,10 @@ fn build_logger( .with_http() .with_endpoint(endpoint) .with_protocol(protocol) - .with_headers(headers.clone()); + .with_headers(headers); if let Some(tls) = tls.as_ref() { - let client = build_http_client(tls, OTEL_EXPORTER_OTLP_LOGS_TIMEOUT)?; + let client = crate::otlp::build_http_client(tls, OTEL_EXPORTER_OTLP_LOGS_TIMEOUT)?; exporter_builder = exporter_builder.with_http_client(client); } @@ -289,8 +304,9 @@ fn build_tracer_provider( resource: &Resource, exporter: &OtelExporter, ) -> Result> { - let span_exporter = match exporter { + let span_exporter = match crate::config::resolve_exporter(exporter) { OtelExporter::None => return Ok(SdkTracerProvider::builder().build()), + OtelExporter::Statsig => unreachable!("statsig exporter should be resolved"), OtelExporter::OtlpGrpc { endpoint, headers, @@ -298,14 +314,14 @@ fn build_tracer_provider( } => { debug!("Using OTLP Grpc exporter for traces: {endpoint}"); - let header_map = build_header_map(headers); + let header_map = crate::otlp::build_header_map(&headers); let base_tls_config = ClientTlsConfig::new() .with_enabled_roots() .assume_http2(true); let tls_config = match tls.as_ref() { - Some(tls) => build_grpc_tls_config(endpoint, base_tls_config, tls)?, + Some(tls) => crate::otlp::build_grpc_tls_config(&endpoint, base_tls_config, tls)?, None => base_tls_config, }; @@ -333,10 +349,11 @@ fn build_tracer_provider( .with_http() .with_endpoint(endpoint) .with_protocol(protocol) - .with_headers(headers.clone()); + .with_headers(headers); if let Some(tls) = tls.as_ref() { - let client = build_http_client(tls, OTEL_EXPORTER_OTLP_TRACES_TIMEOUT)?; + let client = + crate::otlp::build_http_client(tls, OTEL_EXPORTER_OTLP_TRACES_TIMEOUT)?; exporter_builder = exporter_builder.with_http_client(client); } @@ -352,150 +369,6 @@ fn build_tracer_provider( .build()) } -fn build_header_map(headers: &HashMap) -> HeaderMap { - let mut header_map = HeaderMap::new(); - for (key, value) in headers { - if let Ok(name) = HeaderName::from_bytes(key.as_bytes()) - && let Ok(val) = HeaderValue::from_str(value) - { - header_map.insert(name, val); - } - } - header_map -} - -fn build_grpc_tls_config( - endpoint: &str, - tls_config: ClientTlsConfig, - tls: &OtelTlsConfig, -) -> Result> { - let uri: Uri = endpoint.parse()?; - let host = uri.host().ok_or_else(|| { - config_error(format!( - "OTLP gRPC endpoint {endpoint} does not include a host" - )) - })?; - - let mut config = tls_config.domain_name(host.to_owned()); - - if let Some(path) = tls.ca_certificate.as_ref() { - let (pem, _) = read_bytes(path)?; - config = config.ca_certificate(TonicCertificate::from_pem(pem)); - } - - match (&tls.client_certificate, &tls.client_private_key) { - (Some(cert_path), Some(key_path)) => { - let (cert_pem, _) = read_bytes(cert_path)?; - let (key_pem, _) = read_bytes(key_path)?; - config = config.identity(TonicIdentity::from_pem(cert_pem, key_pem)); - } - (Some(_), None) | (None, Some(_)) => { - return Err(config_error( - "client_certificate and client_private_key must both be provided for mTLS", - )); - } - (None, None) => {} - } - - Ok(config) -} - -/// Build a blocking HTTP client with TLS configuration for the OTLP HTTP exporter. -/// -/// We use `reqwest::blocking::Client` instead of the async client because the -/// `opentelemetry_sdk` `BatchLogProcessor` spawns a dedicated OS thread that uses -/// `futures_executor::block_on()` rather than tokio. When the async reqwest client's -/// timeout calls `tokio::time::sleep()`, it panics with "no reactor running". -fn build_http_client( - tls: &OtelTlsConfig, - timeout_var: &str, -) -> Result> { - // Wrap in block_in_place because reqwest::blocking::Client creates its own - // internal tokio runtime, which would panic if built directly from an async context. - tokio::task::block_in_place(|| build_http_client_inner(tls, timeout_var)) -} - -fn build_http_client_inner( - tls: &OtelTlsConfig, - timeout_var: &str, -) -> Result> { - let mut builder = - reqwest::blocking::Client::builder().timeout(resolve_otlp_timeout(timeout_var)); - - if let Some(path) = tls.ca_certificate.as_ref() { - let (pem, location) = read_bytes(path)?; - let certificate = ReqwestCertificate::from_pem(pem.as_slice()).map_err(|error| { - config_error(format!( - "failed to parse certificate {}: {error}", - location.display() - )) - })?; - // Disable built-in root certificates and use only our custom CA - builder = builder - .tls_built_in_root_certs(false) - .add_root_certificate(certificate); - } - - match (&tls.client_certificate, &tls.client_private_key) { - (Some(cert_path), Some(key_path)) => { - let (mut cert_pem, cert_location) = read_bytes(cert_path)?; - let (key_pem, key_location) = read_bytes(key_path)?; - cert_pem.extend_from_slice(key_pem.as_slice()); - let identity = ReqwestIdentity::from_pem(cert_pem.as_slice()).map_err(|error| { - config_error(format!( - "failed to parse client identity using {} and {}: {error}", - cert_location.display(), - key_location.display() - )) - })?; - builder = builder.identity(identity).https_only(true); - } - (Some(_), None) | (None, Some(_)) => { - return Err(config_error( - "client_certificate and client_private_key must both be provided for mTLS", - )); - } - (None, None) => {} - } - - builder - .build() - .map_err(|error| Box::new(error) as Box) -} - -fn resolve_otlp_timeout(signal_var: &str) -> Duration { - if let Some(timeout) = read_timeout_env(signal_var) { - return timeout; - } - if let Some(timeout) = read_timeout_env(OTEL_EXPORTER_OTLP_TIMEOUT) { - return timeout; - } - OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT -} - -fn read_timeout_env(var: &str) -> Option { - let value = env::var(var).ok()?; - let parsed = value.parse::().ok()?; - if parsed < 0 { - return None; - } - Some(Duration::from_millis(parsed as u64)) -} - -fn read_bytes(path: &AbsolutePathBuf) -> Result<(Vec, PathBuf), Box> { - match fs::read(path) { - Ok(bytes) => Ok((bytes, path.to_path_buf())), - Err(error) => Err(Box::new(io::Error::new( - error.kind(), - format!("failed to read {}: {error}", path.display()), - ))), - } -} - -fn config_error(message: impl Into) -> Box { - Box::new(io::Error::new(ErrorKind::InvalidData, message.into())) -} - #[cfg(test)] mod tests { use super::*; diff --git a/codex-rs/otel/tests/harness/mod.rs b/codex-rs/otel/tests/harness/mod.rs new file mode 100644 index 00000000000..acdba0b7e11 --- /dev/null +++ b/codex-rs/otel/tests/harness/mod.rs @@ -0,0 +1,81 @@ +use codex_otel::metrics::MetricsClient; +use codex_otel::metrics::MetricsConfig; +use codex_otel::metrics::Result; +use opentelemetry::KeyValue; +use opentelemetry_sdk::metrics::InMemoryMetricExporter; +use opentelemetry_sdk::metrics::data::AggregatedMetrics; +use opentelemetry_sdk::metrics::data::Metric; +use opentelemetry_sdk::metrics::data::MetricData; +use opentelemetry_sdk::metrics::data::ResourceMetrics; +use std::collections::BTreeMap; + +pub(crate) fn build_metrics_with_defaults( + default_tags: &[(&str, &str)], +) -> Result<(MetricsClient, InMemoryMetricExporter)> { + let exporter = InMemoryMetricExporter::default(); + let mut config = MetricsConfig::in_memory( + "test", + "codex-cli", + env!("CARGO_PKG_VERSION"), + exporter.clone(), + ); + for (key, value) in default_tags { + config = config.with_tag(*key, *value)?; + } + let metrics = MetricsClient::new(config)?; + Ok((metrics, exporter)) +} + +pub(crate) fn latest_metrics(exporter: &InMemoryMetricExporter) -> ResourceMetrics { + let Ok(metrics) = exporter.get_finished_metrics() else { + panic!("finished metrics error"); + }; + let Some(metrics) = metrics.into_iter().last() else { + panic!("metrics export missing"); + }; + metrics +} + +pub(crate) fn find_metric<'a>( + resource_metrics: &'a ResourceMetrics, + name: &str, +) -> Option<&'a Metric> { + for scope_metrics in resource_metrics.scope_metrics() { + for metric in scope_metrics.metrics() { + if metric.name() == name { + return Some(metric); + } + } + } + None +} + +pub(crate) fn attributes_to_map<'a>( + attributes: impl Iterator, +) -> BTreeMap { + attributes + .map(|kv| (kv.key.as_str().to_string(), kv.value.as_str().to_string())) + .collect() +} + +pub(crate) fn histogram_data( + resource_metrics: &ResourceMetrics, + name: &str, +) -> (Vec, Vec, f64, u64) { + let metric = + find_metric(resource_metrics, name).unwrap_or_else(|| panic!("metric {name} missing")); + match metric.data() { + AggregatedMetrics::F64(data) => match data { + MetricData::Histogram(histogram) => { + let points: Vec<_> = histogram.data_points().collect(); + assert_eq!(points.len(), 1); + let point = points[0]; + let bounds = point.bounds().collect(); + let bucket_counts = point.bucket_counts().collect(); + (bounds, bucket_counts, point.sum(), point.count()) + } + _ => panic!("unexpected histogram aggregation"), + }, + _ => panic!("unexpected metric data type"), + } +} diff --git a/codex-rs/otel/tests/suite/manager_metrics.rs b/codex-rs/otel/tests/suite/manager_metrics.rs new file mode 100644 index 00000000000..1497a5f84c7 --- /dev/null +++ b/codex-rs/otel/tests/suite/manager_metrics.rs @@ -0,0 +1,104 @@ +use crate::harness::attributes_to_map; +use crate::harness::build_metrics_with_defaults; +use crate::harness::find_metric; +use crate::harness::latest_metrics; +use codex_app_server_protocol::AuthMode; +use codex_otel::OtelManager; +use codex_otel::metrics::Result; +use codex_protocol::ThreadId; +use codex_protocol::protocol::SessionSource; +use opentelemetry_sdk::metrics::data::AggregatedMetrics; +use opentelemetry_sdk::metrics::data::MetricData; +use pretty_assertions::assert_eq; +use std::collections::BTreeMap; + +// Ensures OtelManager attaches metadata tags when forwarding metrics. +#[test] +fn manager_attaches_metadata_tags_to_metrics() -> Result<()> { + let (metrics, exporter) = build_metrics_with_defaults(&[("service", "codex-cli")])?; + let manager = OtelManager::new( + ThreadId::new(), + "gpt-5.1", + "gpt-5.1", + Some("account-id".to_string()), + None, + Some(AuthMode::ApiKey), + true, + "tty".to_string(), + SessionSource::Cli, + ) + .with_metrics(metrics); + + manager.counter("codex.session_started", 1, &[("source", "tui")]); + manager.shutdown_metrics()?; + + let resource_metrics = latest_metrics(&exporter); + let metric = + find_metric(&resource_metrics, "codex.session_started").expect("counter metric missing"); + let attrs = match metric.data() { + AggregatedMetrics::U64(data) => match data { + MetricData::Sum(sum) => { + let points: Vec<_> = sum.data_points().collect(); + assert_eq!(points.len(), 1); + attributes_to_map(points[0].attributes()) + } + _ => panic!("unexpected counter aggregation"), + }, + _ => panic!("unexpected counter data type"), + }; + + let expected = BTreeMap::from([ + ( + "app.version".to_string(), + env!("CARGO_PKG_VERSION").to_string(), + ), + ("auth_mode".to_string(), AuthMode::ApiKey.to_string()), + ("model".to_string(), "gpt-5.1".to_string()), + ("service".to_string(), "codex-cli".to_string()), + ("source".to_string(), "tui".to_string()), + ]); + assert_eq!(attrs, expected); + + Ok(()) +} + +// Ensures metadata tagging can be disabled when recording via OtelManager. +#[test] +fn manager_allows_disabling_metadata_tags() -> Result<()> { + let (metrics, exporter) = build_metrics_with_defaults(&[])?; + let manager = OtelManager::new( + ThreadId::new(), + "gpt-4o", + "gpt-4o", + Some("account-id".to_string()), + None, + Some(AuthMode::ApiKey), + true, + "tty".to_string(), + SessionSource::Cli, + ) + .with_metrics_without_metadata_tags(metrics); + + manager.counter("codex.session_started", 1, &[("source", "tui")]); + manager.shutdown_metrics()?; + + let resource_metrics = latest_metrics(&exporter); + let metric = + find_metric(&resource_metrics, "codex.session_started").expect("counter metric missing"); + let attrs = match metric.data() { + AggregatedMetrics::U64(data) => match data { + MetricData::Sum(sum) => { + let points: Vec<_> = sum.data_points().collect(); + assert_eq!(points.len(), 1); + attributes_to_map(points[0].attributes()) + } + _ => panic!("unexpected counter aggregation"), + }, + _ => panic!("unexpected counter data type"), + }; + + let expected = BTreeMap::from([("source".to_string(), "tui".to_string())]); + assert_eq!(attrs, expected); + + Ok(()) +} diff --git a/codex-rs/otel/tests/suite/mod.rs b/codex-rs/otel/tests/suite/mod.rs new file mode 100644 index 00000000000..c79c7e37c4d --- /dev/null +++ b/codex-rs/otel/tests/suite/mod.rs @@ -0,0 +1,5 @@ +mod manager_metrics; +mod otlp_http_loopback; +mod send; +mod timing; +mod validation; diff --git a/codex-rs/otel/tests/suite/otlp_http_loopback.rs b/codex-rs/otel/tests/suite/otlp_http_loopback.rs new file mode 100644 index 00000000000..599021b3f54 --- /dev/null +++ b/codex-rs/otel/tests/suite/otlp_http_loopback.rs @@ -0,0 +1,192 @@ +use codex_otel::config::OtelExporter; +use codex_otel::config::OtelHttpProtocol; +use codex_otel::metrics::MetricsClient; +use codex_otel::metrics::MetricsConfig; +use codex_otel::metrics::Result; +use std::collections::HashMap; +use std::io::Read as _; +use std::io::Write as _; +use std::net::TcpListener; +use std::net::TcpStream; +use std::sync::mpsc; +use std::thread; +use std::time::Duration; +use std::time::Instant; + +struct CapturedRequest { + path: String, + content_type: Option, + body: Vec, +} + +fn read_http_request( + stream: &mut TcpStream, +) -> std::io::Result<(String, HashMap, Vec)> { + stream.set_read_timeout(Some(Duration::from_secs(2)))?; + + let mut buf = Vec::new(); + let mut scratch = [0u8; 8192]; + let header_end = loop { + let n = stream.read(&mut scratch)?; + if n == 0 { + return Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "EOF before headers", + )); + } + buf.extend_from_slice(&scratch[..n]); + if let Some(end) = buf.windows(4).position(|w| w == b"\r\n\r\n") { + break end; + } + if buf.len() > 1024 * 1024 { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "headers too large", + )); + } + }; + + let headers_bytes = &buf[..header_end]; + let mut body_bytes = buf[header_end + 4..].to_vec(); + + let headers_str = std::str::from_utf8(headers_bytes).map_err(|err| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("headers not utf-8: {err}"), + ) + })?; + let mut lines = headers_str.split("\r\n"); + let start = lines.next().ok_or_else(|| { + std::io::Error::new(std::io::ErrorKind::InvalidData, "missing request line") + })?; + let mut parts = start.split_whitespace(); + let _method = parts.next().unwrap_or_default(); + let path = parts + .next() + .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::InvalidData, "missing path"))? + .to_string(); + + let mut headers = HashMap::new(); + for line in lines { + let Some((k, v)) = line.split_once(':') else { + continue; + }; + headers.insert(k.trim().to_ascii_lowercase(), v.trim().to_string()); + } + + if let Some(len) = headers + .get("content-length") + .and_then(|v| v.parse::().ok()) + { + while body_bytes.len() < len { + let n = stream.read(&mut scratch)?; + if n == 0 { + return Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "EOF before body complete", + )); + } + body_bytes.extend_from_slice(&scratch[..n]); + if body_bytes.len() > len + 1024 * 1024 { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "body too large", + )); + } + } + body_bytes.truncate(len); + } + + Ok((path, headers, body_bytes)) +} + +fn write_http_response(stream: &mut TcpStream, status: &str) -> std::io::Result<()> { + let response = format!("HTTP/1.1 {status}\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"); + stream.write_all(response.as_bytes())?; + stream.flush() +} + +#[test] +fn otlp_http_exporter_sends_metrics_to_collector() -> Result<()> { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind"); + let addr = listener.local_addr().expect("local_addr"); + listener.set_nonblocking(true).expect("set_nonblocking"); + + let (tx, rx) = mpsc::channel::>(); + let server = thread::spawn(move || { + let mut captured = Vec::new(); + let deadline = Instant::now() + Duration::from_secs(3); + + while Instant::now() < deadline { + match listener.accept() { + Ok((mut stream, _)) => { + let result = read_http_request(&mut stream); + let _ = write_http_response(&mut stream, "202 Accepted"); + if let Ok((path, headers, body)) = result { + captured.push(CapturedRequest { + path, + content_type: headers.get("content-type").cloned(), + body, + }); + } + } + Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => { + thread::sleep(Duration::from_millis(10)); + } + Err(_) => break, + } + } + + let _ = tx.send(captured); + }); + + let metrics = MetricsClient::new(MetricsConfig::otlp( + "test", + "codex-cli", + env!("CARGO_PKG_VERSION"), + OtelExporter::OtlpHttp { + endpoint: format!("http://{addr}/v1/metrics"), + headers: HashMap::new(), + protocol: OtelHttpProtocol::Json, + tls: None, + }, + ))?; + + metrics.counter("codex.turns", 1, &[("source", "test")])?; + metrics.shutdown()?; + + server.join().expect("server join"); + let captured = rx.recv_timeout(Duration::from_secs(1)).expect("captured"); + + let request = captured + .iter() + .find(|req| req.path == "/v1/metrics") + .unwrap_or_else(|| { + let paths = captured + .iter() + .map(|req| req.path.as_str()) + .collect::>() + .join(", "); + panic!( + "missing /v1/metrics request; got {}: {paths}", + captured.len() + ); + }); + let content_type = request + .content_type + .as_deref() + .unwrap_or(""); + assert!( + content_type.starts_with("application/json"), + "unexpected content-type: {content_type}" + ); + + let body = String::from_utf8_lossy(&request.body); + assert!( + body.contains("codex.turns"), + "expected metric name not found; body prefix: {}", + &body.chars().take(2000).collect::() + ); + + Ok(()) +} diff --git a/codex-rs/otel/tests/suite/send.rs b/codex-rs/otel/tests/suite/send.rs new file mode 100644 index 00000000000..4e7e0279274 --- /dev/null +++ b/codex-rs/otel/tests/suite/send.rs @@ -0,0 +1,205 @@ +use crate::harness::attributes_to_map; +use crate::harness::build_metrics_with_defaults; +use crate::harness::find_metric; +use crate::harness::histogram_data; +use crate::harness::latest_metrics; +use codex_otel::metrics::Result; +use pretty_assertions::assert_eq; +use std::collections::BTreeMap; + +// Ensures counters/histograms render with default + per-call tags. +#[test] +fn send_builds_payload_with_tags_and_histograms() -> Result<()> { + let (metrics, exporter) = + build_metrics_with_defaults(&[("service", "codex-cli"), ("env", "prod")])?; + + metrics.counter("codex.turns", 1, &[("model", "gpt-5.1"), ("env", "dev")])?; + metrics.histogram("codex.tool_latency", 25, &[("tool", "shell")])?; + metrics.shutdown()?; + + let resource_metrics = latest_metrics(&exporter); + + let counter = find_metric(&resource_metrics, "codex.turns").expect("counter metric missing"); + let counter_attributes = match counter.data() { + opentelemetry_sdk::metrics::data::AggregatedMetrics::U64(data) => match data { + opentelemetry_sdk::metrics::data::MetricData::Sum(sum) => { + let points: Vec<_> = sum.data_points().collect(); + assert_eq!(points.len(), 1); + assert_eq!(points[0].value(), 1); + attributes_to_map(points[0].attributes()) + } + _ => panic!("unexpected counter aggregation"), + }, + _ => panic!("unexpected counter data type"), + }; + + let expected_counter_attributes = BTreeMap::from([ + ("service".to_string(), "codex-cli".to_string()), + ("env".to_string(), "dev".to_string()), + ("model".to_string(), "gpt-5.1".to_string()), + ]); + assert_eq!(counter_attributes, expected_counter_attributes); + + let (bounds, bucket_counts, sum, count) = + histogram_data(&resource_metrics, "codex.tool_latency"); + assert!(!bounds.is_empty()); + assert_eq!(bucket_counts.iter().sum::(), 1); + assert_eq!(sum, 25.0); + assert_eq!(count, 1); + + let histogram_attrs = attributes_to_map( + match find_metric(&resource_metrics, "codex.tool_latency").and_then(|metric| { + match metric.data() { + opentelemetry_sdk::metrics::data::AggregatedMetrics::F64( + opentelemetry_sdk::metrics::data::MetricData::Histogram(histogram), + ) => histogram + .data_points() + .next() + .map(opentelemetry_sdk::metrics::data::HistogramDataPoint::attributes), + _ => None, + } + }) { + Some(attrs) => attrs, + None => panic!("histogram attributes missing"), + }, + ); + let expected_histogram_attributes = BTreeMap::from([ + ("service".to_string(), "codex-cli".to_string()), + ("env".to_string(), "prod".to_string()), + ("tool".to_string(), "shell".to_string()), + ]); + assert_eq!(histogram_attrs, expected_histogram_attributes); + + Ok(()) +} + +// Ensures defaults merge per line and overrides take precedence. +#[test] +fn send_merges_default_tags_per_line() -> Result<()> { + let (metrics, exporter) = build_metrics_with_defaults(&[ + ("service", "codex-cli"), + ("env", "prod"), + ("region", "us"), + ])?; + + metrics.counter("codex.alpha", 1, &[("env", "dev"), ("component", "alpha")])?; + metrics.counter( + "codex.beta", + 2, + &[("service", "worker"), ("component", "beta")], + )?; + metrics.shutdown()?; + + let resource_metrics = latest_metrics(&exporter); + let alpha_metric = + find_metric(&resource_metrics, "codex.alpha").expect("codex.alpha metric missing"); + let alpha_point = match alpha_metric.data() { + opentelemetry_sdk::metrics::data::AggregatedMetrics::U64(data) => match data { + opentelemetry_sdk::metrics::data::MetricData::Sum(sum) => { + let points: Vec<_> = sum.data_points().collect(); + assert_eq!(points.len(), 1); + points[0] + } + _ => panic!("unexpected counter aggregation"), + }, + _ => panic!("unexpected counter data type"), + }; + assert_eq!(alpha_point.value(), 1); + let alpha_attrs = attributes_to_map(alpha_point.attributes()); + let expected_alpha_attrs = BTreeMap::from([ + ("component".to_string(), "alpha".to_string()), + ("env".to_string(), "dev".to_string()), + ("region".to_string(), "us".to_string()), + ("service".to_string(), "codex-cli".to_string()), + ]); + assert_eq!(alpha_attrs, expected_alpha_attrs); + + let beta_metric = + find_metric(&resource_metrics, "codex.beta").expect("codex.beta metric missing"); + let beta_point = match beta_metric.data() { + opentelemetry_sdk::metrics::data::AggregatedMetrics::U64(data) => match data { + opentelemetry_sdk::metrics::data::MetricData::Sum(sum) => { + let points: Vec<_> = sum.data_points().collect(); + assert_eq!(points.len(), 1); + points[0] + } + _ => panic!("unexpected counter aggregation"), + }, + _ => panic!("unexpected counter data type"), + }; + assert_eq!(beta_point.value(), 2); + let beta_attrs = attributes_to_map(beta_point.attributes()); + let expected_beta_attrs = BTreeMap::from([ + ("component".to_string(), "beta".to_string()), + ("env".to_string(), "prod".to_string()), + ("region".to_string(), "us".to_string()), + ("service".to_string(), "worker".to_string()), + ]); + assert_eq!(beta_attrs, expected_beta_attrs); + + Ok(()) +} + +// Verifies enqueued metrics are delivered by the background worker. +#[test] +fn client_sends_enqueued_metric() -> Result<()> { + let (metrics, exporter) = build_metrics_with_defaults(&[])?; + + metrics.counter("codex.turns", 1, &[("model", "gpt-5.1")])?; + metrics.shutdown()?; + + let resource_metrics = latest_metrics(&exporter); + let counter = find_metric(&resource_metrics, "codex.turns").expect("counter metric missing"); + let points = match counter.data() { + opentelemetry_sdk::metrics::data::AggregatedMetrics::U64(data) => match data { + opentelemetry_sdk::metrics::data::MetricData::Sum(sum) => { + sum.data_points().collect::>() + } + _ => panic!("unexpected counter aggregation"), + }, + _ => panic!("unexpected counter data type"), + }; + assert_eq!(points.len(), 1); + let point = points[0]; + assert_eq!(point.value(), 1); + let attrs = attributes_to_map(point.attributes()); + assert_eq!(attrs.get("model").map(String::as_str), Some("gpt-5.1")); + + Ok(()) +} + +// Ensures shutdown flushes successfully with in-memory exporters. +#[test] +fn shutdown_flushes_in_memory_exporter() -> Result<()> { + let (metrics, exporter) = build_metrics_with_defaults(&[])?; + + metrics.counter("codex.turns", 1, &[])?; + metrics.shutdown()?; + + let resource_metrics = latest_metrics(&exporter); + let counter = find_metric(&resource_metrics, "codex.turns").expect("counter metric missing"); + let points = match counter.data() { + opentelemetry_sdk::metrics::data::AggregatedMetrics::U64(data) => match data { + opentelemetry_sdk::metrics::data::MetricData::Sum(sum) => { + sum.data_points().collect::>() + } + _ => panic!("unexpected counter aggregation"), + }, + _ => panic!("unexpected counter data type"), + }; + assert_eq!(points.len(), 1); + + Ok(()) +} + +// Ensures shutting down without recording metrics does not export anything. +#[test] +fn shutdown_without_metrics_exports_nothing() -> Result<()> { + let (metrics, exporter) = build_metrics_with_defaults(&[])?; + + metrics.shutdown()?; + + let finished = exporter.get_finished_metrics().unwrap(); + assert!(finished.is_empty(), "expected no metrics exported"); + Ok(()) +} diff --git a/codex-rs/otel/tests/suite/timing.rs b/codex-rs/otel/tests/suite/timing.rs new file mode 100644 index 00000000000..ce4f2f982e7 --- /dev/null +++ b/codex-rs/otel/tests/suite/timing.rs @@ -0,0 +1,68 @@ +use crate::harness::attributes_to_map; +use crate::harness::build_metrics_with_defaults; +use crate::harness::histogram_data; +use crate::harness::latest_metrics; +use codex_otel::metrics::Result; +use pretty_assertions::assert_eq; +use std::time::Duration; + +// Ensures duration recording maps to histogram output. +#[test] +fn record_duration_records_histogram() -> Result<()> { + let (metrics, exporter) = build_metrics_with_defaults(&[])?; + + metrics.record_duration( + "codex.request_latency", + Duration::from_millis(15), + &[("route", "chat")], + )?; + metrics.shutdown()?; + + let (bounds, bucket_counts, sum, count) = + histogram_data(&latest_metrics(&exporter), "codex.request_latency"); + assert!(!bounds.is_empty()); + assert_eq!(bucket_counts.iter().sum::(), 1); + assert_eq!(sum, 15.0); + assert_eq!(count, 1); + + Ok(()) +} + +// Ensures time_result returns the closure output and records timing. +#[test] +fn timer_result_records_success() -> Result<()> { + let (metrics, exporter) = build_metrics_with_defaults(&[])?; + + { + let timer = metrics.start_timer("codex.request_latency", &[("route", "chat")]); + assert!(timer.is_ok()); + } + + metrics.shutdown()?; + + let resource_metrics = latest_metrics(&exporter); + let (bounds, bucket_counts, _sum, count) = + histogram_data(&resource_metrics, "codex.request_latency"); + assert!(!bounds.is_empty()); + assert_eq!(count, 1); + assert_eq!(bucket_counts.iter().sum::(), 1); + let attrs = attributes_to_map( + match crate::harness::find_metric(&resource_metrics, "codex.request_latency").and_then( + |metric| match metric.data() { + opentelemetry_sdk::metrics::data::AggregatedMetrics::F64( + opentelemetry_sdk::metrics::data::MetricData::Histogram(histogram), + ) => histogram + .data_points() + .next() + .map(opentelemetry_sdk::metrics::data::HistogramDataPoint::attributes), + _ => None, + }, + ) { + Some(attrs) => attrs, + None => panic!("attributes missing"), + }, + ); + assert_eq!(attrs.get("route").map(String::as_str), Some("chat")); + + Ok(()) +} diff --git a/codex-rs/otel/tests/suite/validation.rs b/codex-rs/otel/tests/suite/validation.rs new file mode 100644 index 00000000000..f88d9fbcd42 --- /dev/null +++ b/codex-rs/otel/tests/suite/validation.rs @@ -0,0 +1,87 @@ +use codex_otel::metrics::MetricsClient; +use codex_otel::metrics::MetricsConfig; +use codex_otel::metrics::MetricsError; +use codex_otel::metrics::Result; +use opentelemetry_sdk::metrics::InMemoryMetricExporter; + +fn build_in_memory_client() -> Result { + let exporter = InMemoryMetricExporter::default(); + let config = MetricsConfig::in_memory("test", "codex-cli", env!("CARGO_PKG_VERSION"), exporter); + MetricsClient::new(config) +} + +// Ensures invalid tag components are rejected during config build. +#[test] +fn invalid_tag_component_is_rejected() -> Result<()> { + let err = MetricsConfig::in_memory( + "test", + "codex-cli", + env!("CARGO_PKG_VERSION"), + InMemoryMetricExporter::default(), + ) + .with_tag("bad key", "value") + .unwrap_err(); + assert!(matches!( + err, + MetricsError::InvalidTagComponent { label, value } + if label == "tag key" && value == "bad key" + )); + Ok(()) +} + +// Ensures per-metric tag keys are validated. +#[test] +fn counter_rejects_invalid_tag_key() -> Result<()> { + let metrics = build_in_memory_client()?; + let err = metrics + .counter("codex.turns", 1, &[("bad key", "value")]) + .unwrap_err(); + assert!(matches!( + err, + MetricsError::InvalidTagComponent { label, value } + if label == "tag key" && value == "bad key" + )); + metrics.shutdown()?; + Ok(()) +} + +// Ensures per-metric tag values are validated. +#[test] +fn histogram_rejects_invalid_tag_value() -> Result<()> { + let metrics = build_in_memory_client()?; + let err = metrics + .histogram("codex.request_latency", 3, &[("route", "bad value")]) + .unwrap_err(); + assert!(matches!( + err, + MetricsError::InvalidTagComponent { label, value } + if label == "tag value" && value == "bad value" + )); + metrics.shutdown()?; + Ok(()) +} + +// Ensures invalid metric names are rejected. +#[test] +fn counter_rejects_invalid_metric_name() -> Result<()> { + let metrics = build_in_memory_client()?; + let err = metrics.counter("bad name", 1, &[]).unwrap_err(); + assert!(matches!( + err, + MetricsError::InvalidMetricName { name } if name == "bad name" + )); + metrics.shutdown()?; + Ok(()) +} + +#[test] +fn counter_rejects_negative_increment() -> Result<()> { + let metrics = build_in_memory_client()?; + let err = metrics.counter("codex.turns", -1, &[]).unwrap_err(); + assert!(matches!( + err, + MetricsError::NegativeCounterIncrement { name, inc } if name == "codex.turns" && inc == -1 + )); + metrics.shutdown()?; + Ok(()) +} diff --git a/codex-rs/otel/tests/tests.rs b/codex-rs/otel/tests/tests.rs new file mode 100644 index 00000000000..92f88b95fd8 --- /dev/null +++ b/codex-rs/otel/tests/tests.rs @@ -0,0 +1,2 @@ +mod harness; +mod suite; diff --git a/docs/telemetry.md b/docs/telemetry.md new file mode 100644 index 00000000000..289d6c9a08b --- /dev/null +++ b/docs/telemetry.md @@ -0,0 +1,48 @@ +# Codex Telemetry + +## Config + +**TODO(jif)**: add the config and document it + +## Tracing + +Codex can export OpenTelemetry **log events**, **trace spans**, and **metrics** +when OTEL exporters are configured in `config.toml` (`[otel]`). +By default, exporters are disabled and nothing is sent. + +## Feedback + +Feedback is sent only when you run `/feedback` and confirm. The report includes +the selected category and optional note; if you opt in to include logs, Codex +attaches the most recent in-memory logs for the session (up to ~4 MiB). + +## Metrics + +This section list all the metrics exported by Codex when locally installed. + +### Global context (applies to every event/metric) + +- `surface`: `cli` | `vscode` | `exec` | `mcp` | `subagent_*` (from `SessionSource`). +- `version`: binary version. +- `auth_mode`: `swic` (AuthMode::ChatGPT) | `api` (AuthMode::ApiKey) | `unknown`. +- `model`: name of the model used. + +## Metrics catalog + +Each metric includes the required fields plus the global context above. + +| Metric | Type | Fields | Description | +| ------------------------- | --------- | ------------------------------------- | ------------------------------------------------------------------------------- | +| `approval.requested` | counter | `tool`, `approved` | Tool approval request result (`approved`: `yes` or `no`). | +| `auth.completed` | counter | `status` | Authentication completed (only for ChatGPT authentication). | +| `conversation.compact` | counter | `status`, `number` | Compaction event including the status and the compaction number in the session. | +| `conversation.turn.count` | counter | `role` | User/assistant turns per session. | +| `feature.duration_ms` | histogram | `feature`, `status` | End-to-end feature latency. | +| `feature.used` | counter | `feature` | Feature usage through `/` (e.g., `/undo`, `/review`, ...). | +| `features.state` | counter | `key`, `value` | Feature values that differ from defaults (emit one row per non-default). | +| `mcp.call` | counter | `status` | MCP tool invocation result (`ok` or error string). | +| `model.call.duration_ms` | histogram | `provider`, `status`, `attempt` | Model API request duration. | +| `session.started` | counter | `is_git` | New session created. | +| `tool.call` | counter | `tool`, `status` | Tool invocation result (`ok` or error string). | +| `tool.call.duration_ms` | histogram | `tool`, `status` | Tool execution time. | +| `user.feedback.submitted` | counter | `category`, `include_logs`, `success` | Feedback submission via `/feedback`. |