From efc31e6251fce66dee26b3c7293f3faf158c7415 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 9 Dec 2024 18:21:04 -0800 Subject: [PATCH 1/5] Set global opentelemetry error handler to use tracing::error --- core/src/telemetry/otel.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/telemetry/otel.rs b/core/src/telemetry/otel.rs index 770b6da42..e4516f802 100644 --- a/core/src/telemetry/otel.rs +++ b/core/src/telemetry/otel.rs @@ -10,7 +10,7 @@ use super::{ }; use crate::{abstractions::dbg_panic, telemetry::metrics::DEFAULT_S_BUCKETS}; use opentelemetry::{ - self, + self, global, metrics::{Meter, MeterProvider as MeterProviderT}, Key, KeyValue, Value, }; @@ -160,6 +160,9 @@ impl MemoryGauge { pub fn build_otlp_metric_exporter( opts: OtelCollectorOptions, ) -> Result { + global::set_error_handler(|err| { + tracing::error!("{}", err); + })?; let mut exporter = opentelemetry_otlp::TonicExporterBuilder::default().with_endpoint(opts.url.to_string()); if opts.url.scheme() == "https" || opts.url.scheme() == "grpcs" { From 513c5331cdbc5fb8a8d29ae82668ddc6a938fe63 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 9 Dec 2024 19:25:01 -0800 Subject: [PATCH 2/5] Add OTel HTTP support --- core-api/src/telemetry.rs | 11 +++++++++++ core/Cargo.toml | 2 +- core/src/telemetry/otel.rs | 36 ++++++++++++++++++++++++------------ 3 files changed, 36 insertions(+), 13 deletions(-) diff --git a/core-api/src/telemetry.rs b/core-api/src/telemetry.rs index 7d9a5b2e4..4f4884011 100644 --- a/core-api/src/telemetry.rs +++ b/core-api/src/telemetry.rs @@ -68,6 +68,8 @@ pub struct OtelCollectorOptions { /// If set to true, use f64 seconds for durations instead of u64 milliseconds #[builder(default)] pub use_seconds_for_durations: bool, + #[builder(default = "OtlpProtocl::Grpc")] + pub protocol: OtlpProtocl, } /// Options for exporting metrics to Prometheus @@ -121,6 +123,15 @@ pub enum MetricTemporality { Delta, } +/// Options for configuring telemetry +#[derive(Debug, Clone, Copy)] +pub enum OtlpProtocl { + /// Use gRPC to communicate with the collector + Grpc, + /// Use HTTP to communicate with the collector + Http, +} + impl Default for TelemetryOptions { fn default() -> Self { TelemetryOptionsBuilder::default().build().unwrap() diff --git a/core/Cargo.toml b/core/Cargo.toml index d065a484f..1e38ae61f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -44,7 +44,7 @@ lru = "0.12" mockall = "0.13" opentelemetry = { workspace = true, features = ["metrics"], optional = true } opentelemetry_sdk = { version = "0.24", features = ["rt-tokio", "metrics"], optional = true } -opentelemetry-otlp = { version = "0.17", features = ["tokio", "metrics", "tls"], optional = true } +opentelemetry-otlp = { version = "0.17", features = ["tokio", "metrics", "tls", "http-proto"], optional = true } opentelemetry-prometheus = { version = "0.17", optional = true } parking_lot = { version = "0.12", features = ["send_guard"] } pid = "4.0" diff --git a/core/src/telemetry/otel.rs b/core/src/telemetry/otel.rs index e4516f802..41f26a3c9 100644 --- a/core/src/telemetry/otel.rs +++ b/core/src/telemetry/otel.rs @@ -32,7 +32,7 @@ use temporal_sdk_core_api::telemetry::{ CoreMeter, Counter, Gauge, GaugeF64, Histogram, HistogramDuration, HistogramF64, MetricAttributes, MetricParameters, NewAttributes, }, - MetricTemporality, OtelCollectorOptions, PrometheusExporterOptions, + MetricTemporality, OtelCollectorOptions, OtlpProtocl, PrometheusExporterOptions, }; use tokio::task::AbortHandle; use tonic::{metadata::MetadataMap, transport::ClientTlsConfig}; @@ -163,17 +163,29 @@ pub fn build_otlp_metric_exporter( global::set_error_handler(|err| { tracing::error!("{}", err); })?; - let mut exporter = - opentelemetry_otlp::TonicExporterBuilder::default().with_endpoint(opts.url.to_string()); - if opts.url.scheme() == "https" || opts.url.scheme() == "grpcs" { - exporter = exporter.with_tls_config(ClientTlsConfig::new().with_native_roots()); - } - let exporter = exporter - .with_metadata(MetadataMap::from_headers((&opts.headers).try_into()?)) - .build_metrics_exporter( - Box::new(SDKAggSelector::new(opts.use_seconds_for_durations)), - Box::new(metric_temporality_to_selector(opts.metric_temporality)), - )?; + let exporter = match opts.protocol { + OtlpProtocl::Grpc => { + let mut exporter = opentelemetry_otlp::TonicExporterBuilder::default() + .with_endpoint(opts.url.to_string()); + if opts.url.scheme() == "https" || opts.url.scheme() == "grpcs" { + exporter = exporter.with_tls_config(ClientTlsConfig::new().with_native_roots()); + } + exporter + .with_metadata(MetadataMap::from_headers((&opts.headers).try_into()?)) + .build_metrics_exporter( + Box::new(SDKAggSelector::new(opts.use_seconds_for_durations)), + Box::new(metric_temporality_to_selector(opts.metric_temporality)), + )? + } + OtlpProtocl::Http => opentelemetry_otlp::HttpExporterBuilder::default() + .with_endpoint(opts.url.to_string()) + .with_headers(opts.headers) + .build_metrics_exporter( + Box::new(SDKAggSelector::new(opts.use_seconds_for_durations)), + Box::new(metric_temporality_to_selector(opts.metric_temporality)), + )?, + }; + let reader = PeriodicReader::builder(exporter, runtime::Tokio) .with_interval(opts.metric_periodicity) .build(); From f390cfc76cc6019f27a82546d8c78d340d5e5cde Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 9 Dec 2024 19:26:00 -0800 Subject: [PATCH 3/5] Revert "Add OTel HTTP support" This reverts commit 513c5331cdbc5fb8a8d29ae82668ddc6a938fe63. --- core-api/src/telemetry.rs | 11 ----------- core/Cargo.toml | 2 +- core/src/telemetry/otel.rs | 36 ++++++++++++------------------------ 3 files changed, 13 insertions(+), 36 deletions(-) diff --git a/core-api/src/telemetry.rs b/core-api/src/telemetry.rs index 4f4884011..7d9a5b2e4 100644 --- a/core-api/src/telemetry.rs +++ b/core-api/src/telemetry.rs @@ -68,8 +68,6 @@ pub struct OtelCollectorOptions { /// If set to true, use f64 seconds for durations instead of u64 milliseconds #[builder(default)] pub use_seconds_for_durations: bool, - #[builder(default = "OtlpProtocl::Grpc")] - pub protocol: OtlpProtocl, } /// Options for exporting metrics to Prometheus @@ -123,15 +121,6 @@ pub enum MetricTemporality { Delta, } -/// Options for configuring telemetry -#[derive(Debug, Clone, Copy)] -pub enum OtlpProtocl { - /// Use gRPC to communicate with the collector - Grpc, - /// Use HTTP to communicate with the collector - Http, -} - impl Default for TelemetryOptions { fn default() -> Self { TelemetryOptionsBuilder::default().build().unwrap() diff --git a/core/Cargo.toml b/core/Cargo.toml index 1e38ae61f..d065a484f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -44,7 +44,7 @@ lru = "0.12" mockall = "0.13" opentelemetry = { workspace = true, features = ["metrics"], optional = true } opentelemetry_sdk = { version = "0.24", features = ["rt-tokio", "metrics"], optional = true } -opentelemetry-otlp = { version = "0.17", features = ["tokio", "metrics", "tls", "http-proto"], optional = true } +opentelemetry-otlp = { version = "0.17", features = ["tokio", "metrics", "tls"], optional = true } opentelemetry-prometheus = { version = "0.17", optional = true } parking_lot = { version = "0.12", features = ["send_guard"] } pid = "4.0" diff --git a/core/src/telemetry/otel.rs b/core/src/telemetry/otel.rs index 41f26a3c9..e4516f802 100644 --- a/core/src/telemetry/otel.rs +++ b/core/src/telemetry/otel.rs @@ -32,7 +32,7 @@ use temporal_sdk_core_api::telemetry::{ CoreMeter, Counter, Gauge, GaugeF64, Histogram, HistogramDuration, HistogramF64, MetricAttributes, MetricParameters, NewAttributes, }, - MetricTemporality, OtelCollectorOptions, OtlpProtocl, PrometheusExporterOptions, + MetricTemporality, OtelCollectorOptions, PrometheusExporterOptions, }; use tokio::task::AbortHandle; use tonic::{metadata::MetadataMap, transport::ClientTlsConfig}; @@ -163,29 +163,17 @@ pub fn build_otlp_metric_exporter( global::set_error_handler(|err| { tracing::error!("{}", err); })?; - let exporter = match opts.protocol { - OtlpProtocl::Grpc => { - let mut exporter = opentelemetry_otlp::TonicExporterBuilder::default() - .with_endpoint(opts.url.to_string()); - if opts.url.scheme() == "https" || opts.url.scheme() == "grpcs" { - exporter = exporter.with_tls_config(ClientTlsConfig::new().with_native_roots()); - } - exporter - .with_metadata(MetadataMap::from_headers((&opts.headers).try_into()?)) - .build_metrics_exporter( - Box::new(SDKAggSelector::new(opts.use_seconds_for_durations)), - Box::new(metric_temporality_to_selector(opts.metric_temporality)), - )? - } - OtlpProtocl::Http => opentelemetry_otlp::HttpExporterBuilder::default() - .with_endpoint(opts.url.to_string()) - .with_headers(opts.headers) - .build_metrics_exporter( - Box::new(SDKAggSelector::new(opts.use_seconds_for_durations)), - Box::new(metric_temporality_to_selector(opts.metric_temporality)), - )?, - }; - + let mut exporter = + opentelemetry_otlp::TonicExporterBuilder::default().with_endpoint(opts.url.to_string()); + if opts.url.scheme() == "https" || opts.url.scheme() == "grpcs" { + exporter = exporter.with_tls_config(ClientTlsConfig::new().with_native_roots()); + } + let exporter = exporter + .with_metadata(MetadataMap::from_headers((&opts.headers).try_into()?)) + .build_metrics_exporter( + Box::new(SDKAggSelector::new(opts.use_seconds_for_durations)), + Box::new(metric_temporality_to_selector(opts.metric_temporality)), + )?; let reader = PeriodicReader::builder(exporter, runtime::Tokio) .with_interval(opts.metric_periodicity) .build(); From 6d582af4f979fba4cf6fa899bb4162af94ae7811 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Fri, 13 Dec 2024 12:15:47 -0800 Subject: [PATCH 4/5] attempting to write a new test to validate this new setting --- tests/integ_tests/metrics_tests.rs | 123 +++++++++++++++++++++++++++++ 1 file changed, 123 insertions(+) diff --git a/tests/integ_tests/metrics_tests.rs b/tests/integ_tests/metrics_tests.rs index c4f11c8c8..291c1e7bc 100644 --- a/tests/integ_tests/metrics_tests.rs +++ b/tests/integ_tests/metrics_tests.rs @@ -1,5 +1,6 @@ use anyhow::anyhow; use assert_matches::assert_matches; +use std::sync::Mutex; use std::{collections::HashMap, env, net::SocketAddr, sync::Arc, time::Duration}; use temporal_client::{ WorkflowClientTrait, WorkflowOptions, WorkflowService, REQUEST_LATENCY_HISTOGRAM_NAME, @@ -45,6 +46,7 @@ use temporal_sdk_core_test_utils::{ get_integ_server_options, get_integ_telem_options, CoreWfStarter, NAMESPACE, OTEL_URL_ENV_VAR, }; use tokio::{join, sync::Barrier, task::AbortHandle}; +use tracing_subscriber::fmt::MakeWriter; use url::Url; static ANY_PORT: &str = "127.0.0.1:0"; @@ -815,3 +817,124 @@ async fn evict_on_complete_does_not_count_as_forced_eviction() { // Metric shouldn't show up at all, since it's zero the whole time. assert!(!body.contains("temporal_sticky_cache_total_forced_eviction")); } + +#[tokio::test] +async fn test_otel_errors_are_logged_as_errors() { + // Create a subscriber that captures logs + let collector = tracing_subscriber::fmt() + .with_max_level(tracing::Level::ERROR) + .with_test_writer() + .finish(); + + // Set this subscriber as default + let _guard = tracing::subscriber::set_default(collector); + + let opts = OtelCollectorOptionsBuilder::default() + // Intentionally invalid endpoint + .url("http://localhost:9999/v1/metrics".parse().unwrap()) + // .protocol(OtlpProtocol::Http) + .build() + .unwrap(); + + // This exporter will fail every time it tries to export metrics + let exporter = build_otlp_metric_exporter(opts).unwrap(); + +} + +// use std::sync::{Arc, Mutex}; +// use std::time::Duration; +// use temporal_sdk_core::{ +// CoreRuntime, TelemetryOptionsBuilder, telemetry::{build_otlp_metric_exporter, OtlpProtocol, OtelCollectorOptionsBuilder, CoreMeter}, +// }; +// use tracing_subscriber::fmt::MakeWriter; +// use tracing::Level; +// use tokio::time::sleep; + +// A writer that captures logs into a buffer so we can assert on them. +struct CapturingWriter { + buf: Arc>>, +} + +impl MakeWriter<'_> for CapturingWriter { + type Writer = CapturingHandle; + + fn make_writer(&self) -> Self::Writer { + CapturingHandle(self.buf.clone()) + } +} + +struct CapturingHandle(Arc>>); + +impl std::io::Write for CapturingHandle { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let mut b = self.0.lock().unwrap(); + b.extend_from_slice(buf); + Ok(buf.len()) + } + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_otel_error_logged() { + // Set up tracing subscriber to capture ERROR logs + let logs = Arc::new(Mutex::new(Vec::new())); + let writer = CapturingWriter { buf: logs.clone() }; + + let subscriber = tracing_subscriber::fmt() + .with_max_level(tracing::Level::TRACE) + .with_writer(writer) + .finish(); + let _guard = tracing::subscriber::set_default(subscriber); + + // Configure OTLP exporter with an invalid endpoint so it fails + let opts = OtelCollectorOptionsBuilder::default() + .url("http://localhost:9999/v1/metrics".parse().unwrap()) // Invalid endpoint + // .protocol(OtlpProtocol::Http) + .build() + .unwrap(); + let exporter = build_otlp_metric_exporter(opts).unwrap(); + + let telemopts = TelemetryOptionsBuilder::default() + .metrics(Arc::new(exporter) as Arc) + .build() + .unwrap(); + + let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); + + let opts = get_integ_server_options(); + let mut raw_client = opts + .connect_no_namespace(rt.telemetry().get_temporal_metric_meter()) + .await + .unwrap(); + assert!(raw_client.get_client().capabilities().is_some()); + + let _ = raw_client + .list_namespaces(ListNamespacesRequest::default()) + .await + .unwrap(); + + // Trigger metric emission or just wait for exporter attempts + // If you have a Temporal client to generate metrics, you can do so here. + // For now, just wait to allow exporter to attempt sending metrics and fail. + tokio::time::sleep(Duration::from_secs(5)).await; + + + // Check the captured logs + let logs = logs.lock().unwrap(); + let log_str = String::from_utf8_lossy(&logs); + + // Assert that there is an error log + assert!( + log_str.contains("ERROR"), + "Expected ERROR log not found in logs: {}", + log_str + ); + // Look for some substring that indicates OTLP export failed + assert!( + log_str.contains("failed") || log_str.contains("error"), + "Expected an OTel exporter error message in logs: {}", + log_str + ); +} \ No newline at end of file From 5386ced03a2a0185909ce178edf328be272806aa Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Mon, 16 Dec 2024 10:10:33 -0800 Subject: [PATCH 5/5] Added test --- tests/integ_tests/metrics_tests.rs | 75 ++++-------------------------- 1 file changed, 10 insertions(+), 65 deletions(-) diff --git a/tests/integ_tests/metrics_tests.rs b/tests/integ_tests/metrics_tests.rs index 291c1e7bc..7d009f995 100644 --- a/tests/integ_tests/metrics_tests.rs +++ b/tests/integ_tests/metrics_tests.rs @@ -818,39 +818,6 @@ async fn evict_on_complete_does_not_count_as_forced_eviction() { assert!(!body.contains("temporal_sticky_cache_total_forced_eviction")); } -#[tokio::test] -async fn test_otel_errors_are_logged_as_errors() { - // Create a subscriber that captures logs - let collector = tracing_subscriber::fmt() - .with_max_level(tracing::Level::ERROR) - .with_test_writer() - .finish(); - - // Set this subscriber as default - let _guard = tracing::subscriber::set_default(collector); - - let opts = OtelCollectorOptionsBuilder::default() - // Intentionally invalid endpoint - .url("http://localhost:9999/v1/metrics".parse().unwrap()) - // .protocol(OtlpProtocol::Http) - .build() - .unwrap(); - - // This exporter will fail every time it tries to export metrics - let exporter = build_otlp_metric_exporter(opts).unwrap(); - -} - -// use std::sync::{Arc, Mutex}; -// use std::time::Duration; -// use temporal_sdk_core::{ -// CoreRuntime, TelemetryOptionsBuilder, telemetry::{build_otlp_metric_exporter, OtlpProtocol, OtelCollectorOptionsBuilder, CoreMeter}, -// }; -// use tracing_subscriber::fmt::MakeWriter; -// use tracing::Level; -// use tokio::time::sleep; - -// A writer that captures logs into a buffer so we can assert on them. struct CapturingWriter { buf: Arc>>, } @@ -876,22 +843,16 @@ impl std::io::Write for CapturingHandle { } } -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn test_otel_error_logged() { +#[tokio::test] +async fn otel_errors_logged_as_errors() { // Set up tracing subscriber to capture ERROR logs let logs = Arc::new(Mutex::new(Vec::new())); let writer = CapturingWriter { buf: logs.clone() }; - - let subscriber = tracing_subscriber::fmt() - .with_max_level(tracing::Level::TRACE) - .with_writer(writer) - .finish(); + let subscriber = tracing_subscriber::fmt().with_writer(writer).finish(); let _guard = tracing::subscriber::set_default(subscriber); - // Configure OTLP exporter with an invalid endpoint so it fails let opts = OtelCollectorOptionsBuilder::default() - .url("http://localhost:9999/v1/metrics".parse().unwrap()) // Invalid endpoint - // .protocol(OtlpProtocol::Http) + .url("https://localhostt:9995/v1/metrics".parse().unwrap()) // Invalid endpoint .build() .unwrap(); let exporter = build_otlp_metric_exporter(opts).unwrap(); @@ -902,39 +863,23 @@ async fn test_otel_error_logged() { .unwrap(); let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); + let mut starter = CoreWfStarter::new_with_runtime("otel_errors_logged_as_errors", rt); + let _worker = starter.get_worker().await; - let opts = get_integ_server_options(); - let mut raw_client = opts - .connect_no_namespace(rt.telemetry().get_temporal_metric_meter()) - .await - .unwrap(); - assert!(raw_client.get_client().capabilities().is_some()); - - let _ = raw_client - .list_namespaces(ListNamespacesRequest::default()) - .await - .unwrap(); + // Wait to allow exporter to attempt sending metrics and fail. + tokio::time::sleep(Duration::from_secs(2)).await; - // Trigger metric emission or just wait for exporter attempts - // If you have a Temporal client to generate metrics, you can do so here. - // For now, just wait to allow exporter to attempt sending metrics and fail. - tokio::time::sleep(Duration::from_secs(5)).await; - - - // Check the captured logs let logs = logs.lock().unwrap(); let log_str = String::from_utf8_lossy(&logs); - // Assert that there is an error log assert!( log_str.contains("ERROR"), "Expected ERROR log not found in logs: {}", log_str ); - // Look for some substring that indicates OTLP export failed assert!( - log_str.contains("failed") || log_str.contains("error"), + log_str.contains("Metrics exporter otlp failed with the grpc server returns error"), "Expected an OTel exporter error message in logs: {}", log_str ); -} \ No newline at end of file +}