From 4eb3c7294070278c9ca376707d01d57fb990d7cf Mon Sep 17 00:00:00 2001 From: Connor Hindley Date: Wed, 23 Apr 2025 10:21:56 -0500 Subject: [PATCH 1/8] 0.26 -> 0.27 --- Cargo.toml | 2 +- core/Cargo.toml | 6 ++-- core/src/telemetry/otel.rs | 61 ++++++++++++++++---------------------- 3 files changed, 29 insertions(+), 40 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4ada4bf3d..3508dc008 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ derive_more = { version = "2.0", features = ["constructor", "display", "from", " thiserror = "2" tonic = "0.12" tonic-build = "0.12" -opentelemetry = { version = "0.26", features = ["metrics"] } +opentelemetry = { version = "0.27", features = ["metrics"] } prost = "0.13" prost-types = "0.13" diff --git a/core/Cargo.toml b/core/Cargo.toml index fbf048a15..808ff187b 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -43,9 +43,9 @@ itertools = "0.14" lru = "0.13" mockall = "0.13" opentelemetry = { workspace = true, features = ["metrics"], optional = true } -opentelemetry_sdk = { version = "0.26", features = ["rt-tokio", "metrics"], optional = true } -opentelemetry-otlp = { version = "0.26", features = ["tokio", "metrics", "tls", "http-proto", "reqwest-client", ], optional = true } -opentelemetry-prometheus = { git = "https://github.com/open-telemetry/opentelemetry-rust.git", rev = "e911383", optional = true } +opentelemetry_sdk = { version = "0.27", features = ["rt-tokio", "metrics", "spec_unstable_metrics_views"], optional = true } +opentelemetry-otlp = { version = "0.27", features = ["tokio", "metrics", "tls", "http-proto", "reqwest-client", ], optional = true } +opentelemetry-prometheus = { version = "0.27", optional = true } parking_lot = { version = "0.12", features = ["send_guard"] } pid = "4.0" pin-project = "1.0" diff --git a/core/src/telemetry/otel.rs b/core/src/telemetry/otel.rs index 187ef1449..1e4611d75 100644 --- a/core/src/telemetry/otel.rs +++ b/core/src/telemetry/otel.rs @@ -11,17 +11,14 @@ use super::{ }; use crate::{abstractions::dbg_panic, telemetry::metrics::DEFAULT_S_BUCKETS}; use opentelemetry::{ - self, Key, KeyValue, Value, global, + self, Key, KeyValue, Value, metrics::{Meter, MeterProvider as MeterProviderT}, }; -use opentelemetry_otlp::WithExportConfig; +use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, WithTonicConfig}; use opentelemetry_sdk::{ - Resource, metrics::{ - Aggregation, Instrument, InstrumentKind, MeterProviderBuilder, PeriodicReader, - SdkMeterProvider, View, data::Temporality, new_view, reader::TemporalitySelector, - }, - runtime, + new_view, Temporality, Aggregation, Instrument, InstrumentKind, MeterProviderBuilder, MetricError, PeriodicReader, SdkMeterProvider, View + }, runtime, Resource }; use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; use temporal_sdk_core_api::telemetry::{ @@ -35,10 +32,13 @@ use temporal_sdk_core_api::telemetry::{ use tokio::task::AbortHandle; use tonic::{metadata::MetadataMap, transport::ClientTlsConfig}; +/// A specialized `Result` type for metric operations. +type Result = std::result::Result; + fn histo_view( metric_name: &'static str, use_seconds: bool, -) -> opentelemetry::metrics::Result> { +) -> Result> { let buckets = default_buckets_for(metric_name, use_seconds); new_view( Instrument::new().name(format!("*{metric_name}")), @@ -56,7 +56,7 @@ pub(super) fn augment_meter_provider_with_defaults( global_tags: &HashMap, use_seconds: bool, bucket_overrides: HistogramBucketOverrides, -) -> opentelemetry::metrics::Result { +) -> Result { for (name, buckets) in bucket_overrides.overrides { mpb = mpb.with_view(new_view( Instrument::new().name(format!("*{name}")), @@ -117,29 +117,27 @@ pub(super) fn augment_meter_provider_with_defaults( /// Create an OTel meter that can be used as a [CoreMeter] to export metrics over OTLP. pub fn build_otlp_metric_exporter( opts: OtelCollectorOptions, -) -> Result { - global::set_error_handler(|err| { - tracing::error!("{}", err); - })?; +) -> std::result::Result { let exporter = match opts.protocol { OtlpProtocol::Grpc => { - let mut exporter = opentelemetry_otlp::TonicExporterBuilder::default() + let mut exporter = opentelemetry_otlp::MetricExporter::builder() + .with_tonic() .with_endpoint(opts.url.to_string()); if opts.url.scheme() == "https" || opts.url.scheme() == "grpcs" { exporter = exporter.with_tls_config(ClientTlsConfig::new().with_native_roots()); } exporter .with_metadata(MetadataMap::from_headers((&opts.headers).try_into()?)) - .build_metrics_exporter(Box::new(metric_temporality_to_selector( + .with_temporality(metric_temporality_to_temporality( opts.metric_temporality, - )))? + )).build()? } OtlpProtocol::Http => opentelemetry_otlp::HttpExporterBuilder::default() .with_endpoint(opts.url.to_string()) .with_headers(opts.headers) - .build_metrics_exporter(Box::new(metric_temporality_to_selector( + .build_metrics_exporter(metric_temporality_to_temporality( opts.metric_temporality, - )))?, + ))?, }; let reader = PeriodicReader::builder(exporter, runtime::Tokio) .with_interval(opts.metric_periodicity) @@ -170,7 +168,7 @@ pub struct StartedPromServer { /// Requires a Tokio runtime to exist, and will block briefly while binding the server endpoint. pub fn start_prometheus_metric_exporter( opts: PrometheusExporterOptions, -) -> Result { +) -> std::result::Result { let (srv, exporter) = PromServer::new(&opts)?; let meter_provider = augment_meter_provider_with_defaults( MeterProviderBuilder::default().with_reader(exporter), @@ -228,7 +226,7 @@ impl CoreMeter for CoreOtelMeter { .u64_counter(params.name) .with_unit(params.unit) .with_description(params.description) - .init(), + .build(), ) } @@ -238,7 +236,7 @@ impl CoreMeter for CoreOtelMeter { .u64_histogram(params.name) .with_unit(params.unit) .with_description(params.description) - .init(), + .build(), ) } @@ -248,7 +246,7 @@ impl CoreMeter for CoreOtelMeter { .f64_histogram(params.name) .with_unit(params.unit) .with_description(params.description) - .init(), + .build(), ) } @@ -268,7 +266,7 @@ impl CoreMeter for CoreOtelMeter { .u64_gauge(params.name) .with_unit(params.unit) .with_description(params.description) - .init(), + .build(), ) } @@ -278,7 +276,7 @@ impl CoreMeter for CoreOtelMeter { .f64_gauge(params.name) .with_unit(params.unit) .with_description(params.description) - .init(), + .build(), ) } } @@ -325,19 +323,10 @@ fn default_resource(override_values: &HashMap) -> Resource { .merge(&Resource::new(override_kvs)) } -#[derive(Clone)] -struct ConstantTemporality(Temporality); - -impl TemporalitySelector for ConstantTemporality { - fn temporality(&self, _: InstrumentKind) -> Temporality { - self.0 - } -} - -fn metric_temporality_to_selector(t: MetricTemporality) -> impl TemporalitySelector + Clone { +fn metric_temporality_to_temporality(t: MetricTemporality) -> Temporality { match t { - MetricTemporality::Cumulative => ConstantTemporality(Temporality::Cumulative), - MetricTemporality::Delta => ConstantTemporality(Temporality::Delta), + MetricTemporality::Cumulative => Temporality::Cumulative, + MetricTemporality::Delta => Temporality::Delta, } } From 54c1681f7dee26f23617d186dc3cb1b06464da34 Mon Sep 17 00:00:00 2001 From: Connor Hindley Date: Wed, 23 Apr 2025 10:28:37 -0500 Subject: [PATCH 2/8] 0.27 -> 0.28 --- Cargo.toml | 2 +- core/Cargo.toml | 6 ++-- core/src/telemetry/otel.rs | 63 ++++++++++++++++++++++++-------------- 3 files changed, 44 insertions(+), 27 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3508dc008..b1cefa0f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ derive_more = { version = "2.0", features = ["constructor", "display", "from", " thiserror = "2" tonic = "0.12" tonic-build = "0.12" -opentelemetry = { version = "0.27", features = ["metrics"] } +opentelemetry = { version = "0.28", features = ["metrics"] } prost = "0.13" prost-types = "0.13" diff --git a/core/Cargo.toml b/core/Cargo.toml index 808ff187b..0fc7ece21 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -43,9 +43,9 @@ itertools = "0.14" lru = "0.13" mockall = "0.13" opentelemetry = { workspace = true, features = ["metrics"], optional = true } -opentelemetry_sdk = { version = "0.27", features = ["rt-tokio", "metrics", "spec_unstable_metrics_views"], optional = true } -opentelemetry-otlp = { version = "0.27", features = ["tokio", "metrics", "tls", "http-proto", "reqwest-client", ], optional = true } -opentelemetry-prometheus = { version = "0.27", optional = true } +opentelemetry_sdk = { version = "0.28", features = ["rt-tokio", "metrics", "spec_unstable_metrics_views"], optional = true } +opentelemetry-otlp = { version = "0.28", features = ["tokio", "metrics", "tls", "http-proto", "reqwest-client", "grpc-tonic" ], optional = true } +opentelemetry-prometheus = { version = "0.28", optional = true } parking_lot = { version = "0.12", features = ["send_guard"] } pid = "4.0" pin-project = "1.0" diff --git a/core/src/telemetry/otel.rs b/core/src/telemetry/otel.rs index 1e4611d75..639ce61c7 100644 --- a/core/src/telemetry/otel.rs +++ b/core/src/telemetry/otel.rs @@ -16,9 +16,11 @@ use opentelemetry::{ }; use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, WithTonicConfig}; use opentelemetry_sdk::{ + Resource, metrics::{ - new_view, Temporality, Aggregation, Instrument, InstrumentKind, MeterProviderBuilder, MetricError, PeriodicReader, SdkMeterProvider, View - }, runtime, Resource + Aggregation, Instrument, InstrumentKind, MeterProviderBuilder, MetricError, PeriodicReader, + SdkMeterProvider, Temporality, View, new_view, + }, }; use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; use temporal_sdk_core_api::telemetry::{ @@ -35,10 +37,7 @@ use tonic::{metadata::MetadataMap, transport::ClientTlsConfig}; /// A specialized `Result` type for metric operations. type Result = std::result::Result; -fn histo_view( - metric_name: &'static str, - use_seconds: bool, -) -> Result> { +fn histo_view(metric_name: &'static str, use_seconds: bool) -> Result> { let buckets = default_buckets_for(metric_name, use_seconds); new_view( Instrument::new().name(format!("*{metric_name}")), @@ -128,18 +127,15 @@ pub fn build_otlp_metric_exporter( } exporter .with_metadata(MetadataMap::from_headers((&opts.headers).try_into()?)) - .with_temporality(metric_temporality_to_temporality( - opts.metric_temporality, - )).build()? + .with_temporality(metric_temporality_to_temporality(opts.metric_temporality)) + .build()? } OtlpProtocol::Http => opentelemetry_otlp::HttpExporterBuilder::default() .with_endpoint(opts.url.to_string()) .with_headers(opts.headers) - .build_metrics_exporter(metric_temporality_to_temporality( - opts.metric_temporality, - ))?, + .build_metrics_exporter(metric_temporality_to_temporality(opts.metric_temporality))?, }; - let reader = PeriodicReader::builder(exporter, runtime::Tokio) + let reader = PeriodicReader::builder(exporter) .with_interval(opts.metric_periodicity) .build(); let mp = augment_meter_provider_with_defaults( @@ -301,14 +297,25 @@ fn default_resource_instance() -> &'static Resource { static INSTANCE: OnceLock = OnceLock::new(); INSTANCE.get_or_init(|| { - let resource = Resource::default(); - if resource.get(Key::from("service.name")) == Some(Value::from("unknown_service")) { + let resource = Resource::builder().build(); + if resource.get(&Key::from("service.name")) == Some(Value::from("unknown_service")) { // otel spec recommends to leave service.name as unknown_service but we want to // maintain backwards compatability with existing library behaviour - return resource.merge(&Resource::new([KeyValue::new( - "service.name", - TELEM_SERVICE_NAME, - )])); + let compat = Resource::builder() + .with_attribute(KeyValue::new("service.name", TELEM_SERVICE_NAME)) + .build(); + return Resource::builder_empty() + .with_attributes( + resource + .iter() + .map(|(k, v)| KeyValue::new(k.clone(), v.clone())), + ) + .with_attributes( + compat + .iter() + .map(|(k, v)| KeyValue::new(k.clone(), v.clone())), + ) + .build(); } resource }) @@ -318,9 +325,19 @@ fn default_resource(override_values: &HashMap) -> Resource { let override_kvs = override_values .iter() .map(|(k, v)| KeyValue::new(k.clone(), v.clone())); - default_resource_instance() - .clone() - .merge(&Resource::new(override_kvs)) + let override_resource = Resource::builder().with_attributes(override_kvs).build(); + Resource::builder_empty() + .with_attributes( + default_resource_instance() + .iter() + .map(|(k, v)| KeyValue::new(k.clone(), v.clone())), + ) + .with_attributes( + override_resource + .iter() + .map(|(k, v)| KeyValue::new(k.clone(), v.clone())), + ) + .build() } fn metric_temporality_to_temporality(t: MetricTemporality) -> Temporality { @@ -338,7 +355,7 @@ pub(crate) mod tests { #[test] pub(crate) fn default_resource_instance_service_name_default() { let resource = default_resource_instance(); - let service_name = resource.get(Key::from("service.name")); + let service_name = resource.get(&Key::from("service.name")); assert_eq!(service_name, Some(Value::from(TELEM_SERVICE_NAME))); } } From 0c6d4fbe77d3c98e3b5896cfe55c1ca0b02b4504 Mon Sep 17 00:00:00 2001 From: Connor Hindley Date: Wed, 23 Apr 2025 10:30:13 -0500 Subject: [PATCH 3/8] 0.28 -> 0.29 --- Cargo.toml | 2 +- core/Cargo.toml | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b1cefa0f2..0dd3e7749 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ derive_more = { version = "2.0", features = ["constructor", "display", "from", " thiserror = "2" tonic = "0.12" tonic-build = "0.12" -opentelemetry = { version = "0.28", features = ["metrics"] } +opentelemetry = { version = "0.29", features = ["metrics"] } prost = "0.13" prost-types = "0.13" diff --git a/core/Cargo.toml b/core/Cargo.toml index 0fc7ece21..501b2264b 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -43,13 +43,13 @@ itertools = "0.14" lru = "0.13" mockall = "0.13" opentelemetry = { workspace = true, features = ["metrics"], optional = true } -opentelemetry_sdk = { version = "0.28", features = ["rt-tokio", "metrics", "spec_unstable_metrics_views"], optional = true } -opentelemetry-otlp = { version = "0.28", features = ["tokio", "metrics", "tls", "http-proto", "reqwest-client", "grpc-tonic" ], optional = true } -opentelemetry-prometheus = { version = "0.28", optional = true } +opentelemetry_sdk = { version = "0.29", features = ["rt-tokio", "metrics", "spec_unstable_metrics_views"], optional = true } +opentelemetry-otlp = { version = "0.29", features = ["tokio", "metrics", "tls", "http-proto", "reqwest-client", "grpc-tonic" ], optional = true } +opentelemetry-prometheus = { version = "0.29", optional = true } parking_lot = { version = "0.12", features = ["send_guard"] } pid = "4.0" pin-project = "1.0" -prometheus = "0.13" +prometheus = "0.14" prost = { workspace = true } prost-types = { version = "0.6", package = "prost-wkt-types" } rand = "0.9" From cbdd6b0b1561b704a47d35ad75f3c52ab4e3e372 Mon Sep 17 00:00:00 2001 From: Connor Hindley Date: Wed, 23 Apr 2025 14:16:54 -0500 Subject: [PATCH 4/8] drop otel_errors_logged_as_errors test upstream change made this functionality tricky https://github.com/open-telemetry/opentelemetry-rust/issues/2697 https://github.com/open-telemetry/opentelemetry-rust/pull/2260 --- tests/integ_tests/metrics_tests.rs | 42 ------------------------------ 1 file changed, 42 deletions(-) diff --git a/tests/integ_tests/metrics_tests.rs b/tests/integ_tests/metrics_tests.rs index eb06e9fcf..045c7e42f 100644 --- a/tests/integ_tests/metrics_tests.rs +++ b/tests/integ_tests/metrics_tests.rs @@ -1136,45 +1136,3 @@ impl std::io::Write for CapturingHandle { Ok(()) } } - -#[tokio::test] -async fn otel_errors_logged_as_errors() { - // Set up tracing subscriber to capture ERROR logs - let logs = Arc::new(Mutex::new(Vec::new())); - let writer = CapturingWriter { buf: logs.clone() }; - let subscriber = tracing_subscriber::fmt().with_writer(writer).finish(); - let _guard = tracing::subscriber::set_default(subscriber); - - let opts = OtelCollectorOptionsBuilder::default() - .url("https://localhost:12345/v1/metrics".parse().unwrap()) // Nothing bound on that port - .build() - .unwrap(); - let exporter = build_otlp_metric_exporter(opts).unwrap(); - - let telemopts = TelemetryOptionsBuilder::default() - .metrics(Arc::new(exporter) as Arc) - .build() - .unwrap(); - - let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); - let mut starter = CoreWfStarter::new_with_runtime("otel_errors_logged_as_errors", rt); - let _worker = starter.get_worker().await; - - // Wait to allow exporter to attempt sending metrics and fail. - // Windows takes a while to fail the network attempt for some reason so 5s. - tokio::time::sleep(Duration::from_secs(5)).await; - - let logs = logs.lock().unwrap(); - let log_str = String::from_utf8_lossy(&logs); - - assert!( - log_str.contains("ERROR"), - "Expected ERROR log not found in logs: {}", - log_str - ); - assert!( - log_str.contains("Metrics exporter otlp failed with the grpc server returns error"), - "Expected an OTel exporter error message in logs: {}", - log_str - ); -} From 3d2bfc1055dff81de94d174afd40157b9b21b218 Mon Sep 17 00:00:00 2001 From: Connor Hindley Date: Wed, 23 Apr 2025 16:02:33 -0500 Subject: [PATCH 5/8] fix bad resource attrs/merge --- core/src/telemetry/otel.rs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/core/src/telemetry/otel.rs b/core/src/telemetry/otel.rs index 639ce61c7..427e02032 100644 --- a/core/src/telemetry/otel.rs +++ b/core/src/telemetry/otel.rs @@ -130,10 +130,12 @@ pub fn build_otlp_metric_exporter( .with_temporality(metric_temporality_to_temporality(opts.metric_temporality)) .build()? } - OtlpProtocol::Http => opentelemetry_otlp::HttpExporterBuilder::default() + OtlpProtocol::Http => opentelemetry_otlp::MetricExporter::builder() + .with_http() .with_endpoint(opts.url.to_string()) .with_headers(opts.headers) - .build_metrics_exporter(metric_temporality_to_temporality(opts.metric_temporality))?, + .with_temporality(metric_temporality_to_temporality(opts.metric_temporality)) + .build()?, }; let reader = PeriodicReader::builder(exporter) .with_interval(opts.metric_periodicity) @@ -301,20 +303,13 @@ fn default_resource_instance() -> &'static Resource { if resource.get(&Key::from("service.name")) == Some(Value::from("unknown_service")) { // otel spec recommends to leave service.name as unknown_service but we want to // maintain backwards compatability with existing library behaviour - let compat = Resource::builder() - .with_attribute(KeyValue::new("service.name", TELEM_SERVICE_NAME)) - .build(); return Resource::builder_empty() .with_attributes( resource .iter() .map(|(k, v)| KeyValue::new(k.clone(), v.clone())), ) - .with_attributes( - compat - .iter() - .map(|(k, v)| KeyValue::new(k.clone(), v.clone())), - ) + .with_attribute(KeyValue::new("service.name", TELEM_SERVICE_NAME)) .build(); } resource @@ -325,7 +320,9 @@ fn default_resource(override_values: &HashMap) -> Resource { let override_kvs = override_values .iter() .map(|(k, v)| KeyValue::new(k.clone(), v.clone())); - let override_resource = Resource::builder().with_attributes(override_kvs).build(); + let override_resource = Resource::builder_empty() + .with_attributes(override_kvs) + .build(); Resource::builder_empty() .with_attributes( default_resource_instance() From d1bf3418a586760cb461c06f6740c6dbbda40532 Mon Sep 17 00:00:00 2001 From: Connor Hindley Date: Wed, 23 Apr 2025 16:19:22 -0500 Subject: [PATCH 6/8] fix lint --- tests/integ_tests/metrics_tests.rs | 28 +--------------------------- 1 file changed, 1 insertion(+), 27 deletions(-) diff --git a/tests/integ_tests/metrics_tests.rs b/tests/integ_tests/metrics_tests.rs index 045c7e42f..a2ffd0f20 100644 --- a/tests/integ_tests/metrics_tests.rs +++ b/tests/integ_tests/metrics_tests.rs @@ -5,7 +5,7 @@ use std::{ collections::HashMap, env, string::ToString, - sync::{Arc, Mutex}, + sync::Arc, time::Duration, }; use temporal_client::{ @@ -60,7 +60,6 @@ use temporal_sdk_core_test_utils::{ get_integ_server_options, get_integ_telem_options, prom_metrics, }; use tokio::{join, sync::Barrier}; -use tracing_subscriber::fmt::MakeWriter; use url::Url; pub(crate) async fn get_text(endpoint: String) -> String { @@ -1111,28 +1110,3 @@ async fn evict_on_complete_does_not_count_as_forced_eviction() { // Metric shouldn't show up at all, since it's zero the whole time. assert!(!body.contains("temporal_sticky_cache_total_forced_eviction")); } - -struct CapturingWriter { - buf: Arc>>, -} - -impl MakeWriter<'_> for CapturingWriter { - type Writer = CapturingHandle; - - fn make_writer(&self) -> Self::Writer { - CapturingHandle(self.buf.clone()) - } -} - -struct CapturingHandle(Arc>>); - -impl std::io::Write for CapturingHandle { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - let mut b = self.0.lock().unwrap(); - b.extend_from_slice(buf); - Ok(buf.len()) - } - fn flush(&mut self) -> std::io::Result<()> { - Ok(()) - } -} From 1044aee8fb5ea9118a7d7d4cfa1450db0888b261 Mon Sep 17 00:00:00 2001 From: Connor Hindley Date: Thu, 24 Apr 2025 08:06:26 -0500 Subject: [PATCH 7/8] fix lint and simplify --- core/src/telemetry/otel.rs | 8 +------- tests/integ_tests/metrics_tests.rs | 8 +------- 2 files changed, 2 insertions(+), 14 deletions(-) diff --git a/core/src/telemetry/otel.rs b/core/src/telemetry/otel.rs index 427e02032..bd8a29569 100644 --- a/core/src/telemetry/otel.rs +++ b/core/src/telemetry/otel.rs @@ -317,12 +317,6 @@ fn default_resource_instance() -> &'static Resource { } fn default_resource(override_values: &HashMap) -> Resource { - let override_kvs = override_values - .iter() - .map(|(k, v)| KeyValue::new(k.clone(), v.clone())); - let override_resource = Resource::builder_empty() - .with_attributes(override_kvs) - .build(); Resource::builder_empty() .with_attributes( default_resource_instance() @@ -330,7 +324,7 @@ fn default_resource(override_values: &HashMap) -> Resource { .map(|(k, v)| KeyValue::new(k.clone(), v.clone())), ) .with_attributes( - override_resource + override_values .iter() .map(|(k, v)| KeyValue::new(k.clone(), v.clone())), ) diff --git a/tests/integ_tests/metrics_tests.rs b/tests/integ_tests/metrics_tests.rs index a2ffd0f20..fd6c4ba26 100644 --- a/tests/integ_tests/metrics_tests.rs +++ b/tests/integ_tests/metrics_tests.rs @@ -1,13 +1,7 @@ use crate::integ_tests::mk_nexus_endpoint; use anyhow::anyhow; use assert_matches::assert_matches; -use std::{ - collections::HashMap, - env, - string::ToString, - sync::Arc, - time::Duration, -}; +use std::{collections::HashMap, env, string::ToString, sync::Arc, time::Duration}; use temporal_client::{ REQUEST_LATENCY_HISTOGRAM_NAME, WorkflowClientTrait, WorkflowOptions, WorkflowService, }; From 0c0ae61a82ad9aabbd42b9f6bdf2d4a4c7529a10 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 6 May 2025 12:04:07 -0700 Subject: [PATCH 8/8] Add (currently unusuable) fallback handler initializer --- README.md | 8 +- core-api/src/telemetry.rs | 39 +++++++++- core/Cargo.toml | 7 +- core/src/telemetry/mod.rs | 142 ++++++++++++++++++----------------- tests/global_metric_tests.rs | 116 ++++++++++++++++++++++++++++ 5 files changed, 234 insertions(+), 78 deletions(-) create mode 100644 tests/global_metric_tests.rs diff --git a/README.md b/README.md index 1151f71ba..df09bad0c 100644 --- a/README.md +++ b/README.md @@ -64,12 +64,12 @@ You can run it using: ## Debugging -The crate uses [tracing](https://github.com/tokio-rs/tracing) to help with debugging. To enable -it for a test, insert the below snippet at the start of the test. By default, tracing data is output -to stdout in a (reasonably) pretty manner. +The crate uses [tracing](https://github.com/tokio-rs/tracing) to help with debugging. To enable it +globally for tests, insert the below snippet at the start of the test. By default, tracing data is +output to stdout in a (reasonably) pretty manner. ```rust -crate::telemetry::test_telem_console(); +crate::telemetry::telemetry_init_fallback(); ``` The passed in options to initialization can be customized to export to an OTel collector, etc. diff --git a/core-api/src/telemetry.rs b/core-api/src/telemetry.rs index 7b3367632..503f2bfee 100644 --- a/core-api/src/telemetry.rs +++ b/core-api/src/telemetry.rs @@ -3,12 +3,12 @@ pub mod metrics; use crate::telemetry::metrics::CoreMeter; use std::{ collections::HashMap, - fmt::Debug, + fmt::{Debug, Formatter}, net::SocketAddr, sync::Arc, time::{Duration, SystemTime, UNIX_EPOCH}, }; -use tracing_core::Level; +use tracing_core::{Level, Subscriber}; use url::Url; pub static METRIC_PREFIX: &str = "temporal_"; @@ -27,7 +27,7 @@ pub trait CoreTelemetry { } /// Telemetry configuration options. Construct with [TelemetryOptionsBuilder] -#[derive(Debug, Clone, derive_builder::Builder)] +#[derive(Clone, derive_builder::Builder)] #[non_exhaustive] pub struct TelemetryOptions { /// Optional logger - set as None to disable. @@ -45,6 +45,39 @@ pub struct TelemetryOptions { /// A prefix to be applied to all core-created metrics. Defaults to "temporal_". #[builder(default = "METRIC_PREFIX.to_string()")] pub metric_prefix: String, + /// If provided, logging config will be ignored and this explicit subscriber will be used for + /// all logging and traces. + #[builder(setter(strip_option), default)] + pub subscriber_override: Option>, +} +impl Debug for TelemetryOptions { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + #[derive(Debug)] + #[allow(dead_code)] + struct TelemetryOptions<'a> { + logging: &'a Option, + metrics: &'a Option>, + attach_service_name: &'a bool, + metric_prefix: &'a str, + } + let Self { + logging, + metrics, + attach_service_name, + metric_prefix, + .. + } = self; + + Debug::fmt( + &TelemetryOptions { + logging, + metrics, + attach_service_name, + metric_prefix, + }, + f, + ) + } } /// Options for exporting to an OpenTelemetry Collector diff --git a/core/Cargo.toml b/core/Cargo.toml index 501b2264b..fc2410f8b 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -44,7 +44,7 @@ lru = "0.13" mockall = "0.13" opentelemetry = { workspace = true, features = ["metrics"], optional = true } opentelemetry_sdk = { version = "0.29", features = ["rt-tokio", "metrics", "spec_unstable_metrics_views"], optional = true } -opentelemetry-otlp = { version = "0.29", features = ["tokio", "metrics", "tls", "http-proto", "reqwest-client", "grpc-tonic" ], optional = true } +opentelemetry-otlp = { version = "0.29", features = ["tokio", "metrics", "tls", "http-proto", "reqwest-client", "grpc-tonic"], optional = true } opentelemetry-prometheus = { version = "0.29", optional = true } parking_lot = { version = "0.12", features = ["send_guard"] } pid = "4.0" @@ -115,6 +115,11 @@ name = "manual_tests" path = "../tests/manual_tests.rs" test = false +[[test]] +name = "global_metric_tests" +path = "../tests/global_metric_tests.rs" +test = false + [[bench]] name = "workflow_replay" harness = false diff --git a/core/src/telemetry/mod.rs b/core/src/telemetry/mod.rs index 1854b54a2..e4ad7c394 100644 --- a/core/src/telemetry/mod.rs +++ b/core/src/telemetry/mod.rs @@ -33,7 +33,7 @@ use std::{ }, }; use temporal_sdk_core_api::telemetry::{ - CoreLog, CoreTelemetry, Logger, TelemetryOptions, + CoreLog, CoreTelemetry, Logger, TelemetryOptions, TelemetryOptionsBuilder, metrics::{CoreMeter, MetricKeyValue, NewAttributes, TemporalMeter}, }; use tracing::{Level, Subscriber}; @@ -173,54 +173,59 @@ pub fn telemetry_init(opts: TelemetryOptions) -> Result { - // This is silly dupe but can't be avoided without boxing. - if env::var("TEMPORAL_CORE_PRETTY_LOGS").is_ok() { - console_pretty_layer = Some( - tracing_subscriber::fmt::layer() - .with_target(false) - .event_format( - tracing_subscriber::fmt::format() - .pretty() - .with_source_location(false), - ) - .with_filter(EnvFilter::new(filter)), - ) - } else { - console_compact_layer = Some( - tracing_subscriber::fmt::layer() - .with_target(false) - .event_format( - tracing_subscriber::fmt::format() - .compact() - .with_source_location(false), - ) - .with_filter(EnvFilter::new(filter)), - ) + let tracing_sub = if let Some(ts) = opts.subscriber_override { + Some(ts) + } else { + opts.logging.map(|logger| { + match logger { + Logger::Console { filter } => { + // This is silly dupe but can't be avoided without boxing. + if env::var("TEMPORAL_CORE_PRETTY_LOGS").is_ok() { + console_pretty_layer = Some( + tracing_subscriber::fmt::layer() + .with_target(false) + .event_format( + tracing_subscriber::fmt::format() + .pretty() + .with_source_location(false), + ) + .with_filter(EnvFilter::new(filter)), + ) + } else { + console_compact_layer = Some( + tracing_subscriber::fmt::layer() + .with_target(false) + .event_format( + tracing_subscriber::fmt::format() + .compact() + .with_source_location(false), + ) + .with_filter(EnvFilter::new(filter)), + ) + } } - } - Logger::Forward { filter } => { - let (export_layer, lo) = - CoreLogConsumerLayer::new_buffered(FORWARD_LOG_BUFFER_SIZE); - logs_out = Some(Mutex::new(lo)); - forward_layer = Some(export_layer.with_filter(EnvFilter::new(filter))); - } - Logger::Push { filter, consumer } => { - forward_layer = - Some(CoreLogConsumerLayer::new(consumer).with_filter(EnvFilter::new(filter))); - } - }; - let reg = tracing_subscriber::registry() - .with(console_pretty_layer) - .with(console_compact_layer) - .with(forward_layer); + Logger::Forward { filter } => { + let (export_layer, lo) = + CoreLogConsumerLayer::new_buffered(FORWARD_LOG_BUFFER_SIZE); + logs_out = Some(Mutex::new(lo)); + forward_layer = Some(export_layer.with_filter(EnvFilter::new(filter))); + } + Logger::Push { filter, consumer } => { + forward_layer = Some( + CoreLogConsumerLayer::new(consumer).with_filter(EnvFilter::new(filter)), + ); + } + }; + let reg = tracing_subscriber::registry() + .with(console_pretty_layer) + .with(console_compact_layer) + .with(forward_layer); - #[cfg(feature = "tokio-console")] - let reg = reg.with(console_subscriber::spawn()); - Arc::new(reg) as Arc - }); + #[cfg(feature = "tokio-console")] + let reg = reg.with(console_subscriber::spawn()); + Arc::new(reg) as Arc + }) + }; Ok(TelemetryInstance::new( tracing_sub, @@ -231,6 +236,9 @@ pub fn telemetry_init(opts: TelemetryOptions) -> Result Result<(), anyhow::Error> { @@ -247,8 +255,23 @@ pub fn telemetry_init_global(opts: TelemetryOptions) -> Result<(), anyhow::Error Ok(()) } -#[cfg(test)] -pub use test_initters::*; +/// WARNING: Calling can cause panics because of https://github.com/tokio-rs/tracing/issues/1656 +/// Lang must not start using until resolved +/// +/// Initialize the fallback global handler. All lang SDKs should call this somewhere, once, at +/// startup, as it initializes a fallback handler for any dependencies (looking at you, otel) that +/// don't provide good ways to customize their tracing usage. It sets a WARN-level global filter +/// that uses the default console logger. +pub fn telemetry_init_fallback() -> Result<(), anyhow::Error> { + telemetry_init_global( + TelemetryOptionsBuilder::default() + .logging(Logger::Console { + filter: construct_filter_string(Level::DEBUG, Level::WARN), + }) + .build()?, + )?; + Ok(()) +} /// A trait for using [Display] on the contents of vecs, etc, which don't implement it. /// @@ -275,24 +298,3 @@ where format!("[{}]", self.iter().format(",")) } } - -/// Helpers for test initialization -#[cfg(test)] -pub mod test_initters { - use super::*; - use temporal_sdk_core_api::telemetry::TelemetryOptionsBuilder; - - /// Turn on logging to the console - #[allow(dead_code)] // Not always used, called to enable for debugging when needed - pub fn test_telem_console() { - telemetry_init_global( - TelemetryOptionsBuilder::default() - .logging(Logger::Console { - filter: construct_filter_string(Level::DEBUG, Level::WARN), - }) - .build() - .unwrap(), - ) - .unwrap(); - } -} diff --git a/tests/global_metric_tests.rs b/tests/global_metric_tests.rs new file mode 100644 index 000000000..6ccddba61 --- /dev/null +++ b/tests/global_metric_tests.rs @@ -0,0 +1,116 @@ +use parking_lot::Mutex; +use std::{sync::Arc, time::Duration}; +use temporal_sdk_core::{ + CoreRuntime, + telemetry::{build_otlp_metric_exporter, construct_filter_string, telemetry_init_global}, +}; +use temporal_sdk_core_api::telemetry::{ + Logger, OtelCollectorOptionsBuilder, TelemetryOptionsBuilder, metrics::CoreMeter, +}; +use temporal_sdk_core_test_utils::CoreWfStarter; +use tracing::Level; +use tracing_subscriber::fmt::MakeWriter; + +struct CapturingWriter { + buf: Arc>>, +} + +impl MakeWriter<'_> for CapturingWriter { + type Writer = CapturingHandle; + + fn make_writer(&self) -> Self::Writer { + CapturingHandle(self.buf.clone()) + } +} + +struct CapturingHandle(Arc>>); + +impl std::io::Write for CapturingHandle { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let mut b = self.0.lock(); + b.extend_from_slice(buf); + Ok(buf.len()) + } + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +// TODO: This test is not actually run in CI right now because for it to actually work requires +// a number of fixes in upstream libraries: +// +// * It regularly panics because of: https://github.com/tokio-rs/tracing/issues/1656 +// * Otel doesn't appear to actually be logging any warnings/errors on connection failure +// * This whole thing is supposed to show a workaround for https://github.com/open-telemetry/opentelemetry-rust/issues/2697 +#[tokio::test] +async fn otel_errors_logged_as_errors() { + // Set up tracing subscriber to capture ERROR logs + let logs = Arc::new(Mutex::new(Vec::new())); + let writer = CapturingWriter { buf: logs.clone() }; + let subscriber = Arc::new( + tracing_subscriber::fmt() + .with_writer(writer) + .with_env_filter("debug") + .finish(), + ); + let opts = OtelCollectorOptionsBuilder::default() + .url("https://localhost:12345/v1/metrics".parse().unwrap()) // Nothing bound on that port + .build() + .unwrap(); + let exporter = build_otlp_metric_exporter(opts).unwrap(); + + // Global initialization is needed to capture (some) otel logging. + telemetry_init_global( + TelemetryOptionsBuilder::default() + .subscriber_override(subscriber) + .build() + .unwrap(), + ) + .unwrap(); + + let rt = CoreRuntime::new_assume_tokio( + TelemetryOptionsBuilder::default() + .metrics(Arc::new(exporter) as Arc) + // Importantly, _not_ using subscriber override, is using console. + .logging(Logger::Console { + filter: construct_filter_string(Level::INFO, Level::WARN), + }) + .build() + .unwrap(), + ) + .unwrap(); + let mut starter = CoreWfStarter::new_with_runtime("otel_errors_logged_as_errors", rt); + let _worker = starter.get_worker().await; + + tracing::debug!("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ should be in global log"); + + // Wait to allow exporter to attempt sending metrics and fail. + // Windows takes a while to fail the network attempt for some reason so 5s. + tokio::time::sleep(Duration::from_secs(5)).await; + + let logs = logs.lock(); + let log_str = String::from_utf8_lossy(&logs).into_owned(); + drop(logs); + + // The core worker _isn't_ using the fallback, and shouldn't be captured + assert!( + !log_str.contains("Initializing worker"), + "Core logging shouldn't have been caught by fallback" + ); + assert!( + log_str.contains("@@@@@@@@@"), + "Expected fallback log not found in logs: {}", + log_str + ); + // TODO: OTel just doesn't actually log useful errors right now 🤷, see issues at top of test + assert!( + log_str.contains("ERROR"), + "Expected ERROR log not found in logs: {}", + log_str + ); + assert!( + log_str.contains("Metrics exporter otlp failed with the grpc server returns error"), + "Expected an OTel exporter error message in logs: {}", + log_str + ); +}