diff --git a/Cargo.toml b/Cargo.toml index 4ada4bf3d..0dd3e7749 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ derive_more = { version = "2.0", features = ["constructor", "display", "from", " thiserror = "2" tonic = "0.12" tonic-build = "0.12" -opentelemetry = { version = "0.26", features = ["metrics"] } +opentelemetry = { version = "0.29", features = ["metrics"] } prost = "0.13" prost-types = "0.13" diff --git a/README.md b/README.md index 1151f71ba..df09bad0c 100644 --- a/README.md +++ b/README.md @@ -64,12 +64,12 @@ You can run it using: ## Debugging -The crate uses [tracing](https://github.com/tokio-rs/tracing) to help with debugging. To enable -it for a test, insert the below snippet at the start of the test. By default, tracing data is output -to stdout in a (reasonably) pretty manner. +The crate uses [tracing](https://github.com/tokio-rs/tracing) to help with debugging. To enable it +globally for tests, insert the below snippet at the start of the test. By default, tracing data is +output to stdout in a (reasonably) pretty manner. ```rust -crate::telemetry::test_telem_console(); +crate::telemetry::telemetry_init_fallback(); ``` The passed in options to initialization can be customized to export to an OTel collector, etc. diff --git a/core-api/src/telemetry.rs b/core-api/src/telemetry.rs index 7b3367632..503f2bfee 100644 --- a/core-api/src/telemetry.rs +++ b/core-api/src/telemetry.rs @@ -3,12 +3,12 @@ pub mod metrics; use crate::telemetry::metrics::CoreMeter; use std::{ collections::HashMap, - fmt::Debug, + fmt::{Debug, Formatter}, net::SocketAddr, sync::Arc, time::{Duration, SystemTime, UNIX_EPOCH}, }; -use tracing_core::Level; +use tracing_core::{Level, Subscriber}; use url::Url; pub static METRIC_PREFIX: &str = "temporal_"; @@ -27,7 +27,7 @@ pub trait CoreTelemetry { } /// Telemetry configuration options. Construct with [TelemetryOptionsBuilder] -#[derive(Debug, Clone, derive_builder::Builder)] +#[derive(Clone, derive_builder::Builder)] #[non_exhaustive] pub struct TelemetryOptions { /// Optional logger - set as None to disable. @@ -45,6 +45,39 @@ pub struct TelemetryOptions { /// A prefix to be applied to all core-created metrics. Defaults to "temporal_". #[builder(default = "METRIC_PREFIX.to_string()")] pub metric_prefix: String, + /// If provided, logging config will be ignored and this explicit subscriber will be used for + /// all logging and traces. + #[builder(setter(strip_option), default)] + pub subscriber_override: Option>, +} +impl Debug for TelemetryOptions { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + #[derive(Debug)] + #[allow(dead_code)] + struct TelemetryOptions<'a> { + logging: &'a Option, + metrics: &'a Option>, + attach_service_name: &'a bool, + metric_prefix: &'a str, + } + let Self { + logging, + metrics, + attach_service_name, + metric_prefix, + .. + } = self; + + Debug::fmt( + &TelemetryOptions { + logging, + metrics, + attach_service_name, + metric_prefix, + }, + f, + ) + } } /// Options for exporting to an OpenTelemetry Collector diff --git a/core/Cargo.toml b/core/Cargo.toml index fbf048a15..fc2410f8b 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -43,13 +43,13 @@ itertools = "0.14" lru = "0.13" mockall = "0.13" opentelemetry = { workspace = true, features = ["metrics"], optional = true } -opentelemetry_sdk = { version = "0.26", features = ["rt-tokio", "metrics"], optional = true } -opentelemetry-otlp = { version = "0.26", features = ["tokio", "metrics", "tls", "http-proto", "reqwest-client", ], optional = true } -opentelemetry-prometheus = { git = "https://github.com/open-telemetry/opentelemetry-rust.git", rev = "e911383", optional = true } +opentelemetry_sdk = { version = "0.29", features = ["rt-tokio", "metrics", "spec_unstable_metrics_views"], optional = true } +opentelemetry-otlp = { version = "0.29", features = ["tokio", "metrics", "tls", "http-proto", "reqwest-client", "grpc-tonic"], optional = true } +opentelemetry-prometheus = { version = "0.29", optional = true } parking_lot = { version = "0.12", features = ["send_guard"] } pid = "4.0" pin-project = "1.0" -prometheus = "0.13" +prometheus = "0.14" prost = { workspace = true } prost-types = { version = "0.6", package = "prost-wkt-types" } rand = "0.9" @@ -115,6 +115,11 @@ name = "manual_tests" path = "../tests/manual_tests.rs" test = false +[[test]] +name = "global_metric_tests" +path = "../tests/global_metric_tests.rs" +test = false + [[bench]] name = "workflow_replay" harness = false diff --git a/core/src/telemetry/mod.rs b/core/src/telemetry/mod.rs index 1854b54a2..e4ad7c394 100644 --- a/core/src/telemetry/mod.rs +++ b/core/src/telemetry/mod.rs @@ -33,7 +33,7 @@ use std::{ }, }; use temporal_sdk_core_api::telemetry::{ - CoreLog, CoreTelemetry, Logger, TelemetryOptions, + CoreLog, CoreTelemetry, Logger, TelemetryOptions, TelemetryOptionsBuilder, metrics::{CoreMeter, MetricKeyValue, NewAttributes, TemporalMeter}, }; use tracing::{Level, Subscriber}; @@ -173,54 +173,59 @@ pub fn telemetry_init(opts: TelemetryOptions) -> Result { - // This is silly dupe but can't be avoided without boxing. - if env::var("TEMPORAL_CORE_PRETTY_LOGS").is_ok() { - console_pretty_layer = Some( - tracing_subscriber::fmt::layer() - .with_target(false) - .event_format( - tracing_subscriber::fmt::format() - .pretty() - .with_source_location(false), - ) - .with_filter(EnvFilter::new(filter)), - ) - } else { - console_compact_layer = Some( - tracing_subscriber::fmt::layer() - .with_target(false) - .event_format( - tracing_subscriber::fmt::format() - .compact() - .with_source_location(false), - ) - .with_filter(EnvFilter::new(filter)), - ) + let tracing_sub = if let Some(ts) = opts.subscriber_override { + Some(ts) + } else { + opts.logging.map(|logger| { + match logger { + Logger::Console { filter } => { + // This is silly dupe but can't be avoided without boxing. + if env::var("TEMPORAL_CORE_PRETTY_LOGS").is_ok() { + console_pretty_layer = Some( + tracing_subscriber::fmt::layer() + .with_target(false) + .event_format( + tracing_subscriber::fmt::format() + .pretty() + .with_source_location(false), + ) + .with_filter(EnvFilter::new(filter)), + ) + } else { + console_compact_layer = Some( + tracing_subscriber::fmt::layer() + .with_target(false) + .event_format( + tracing_subscriber::fmt::format() + .compact() + .with_source_location(false), + ) + .with_filter(EnvFilter::new(filter)), + ) + } } - } - Logger::Forward { filter } => { - let (export_layer, lo) = - CoreLogConsumerLayer::new_buffered(FORWARD_LOG_BUFFER_SIZE); - logs_out = Some(Mutex::new(lo)); - forward_layer = Some(export_layer.with_filter(EnvFilter::new(filter))); - } - Logger::Push { filter, consumer } => { - forward_layer = - Some(CoreLogConsumerLayer::new(consumer).with_filter(EnvFilter::new(filter))); - } - }; - let reg = tracing_subscriber::registry() - .with(console_pretty_layer) - .with(console_compact_layer) - .with(forward_layer); + Logger::Forward { filter } => { + let (export_layer, lo) = + CoreLogConsumerLayer::new_buffered(FORWARD_LOG_BUFFER_SIZE); + logs_out = Some(Mutex::new(lo)); + forward_layer = Some(export_layer.with_filter(EnvFilter::new(filter))); + } + Logger::Push { filter, consumer } => { + forward_layer = Some( + CoreLogConsumerLayer::new(consumer).with_filter(EnvFilter::new(filter)), + ); + } + }; + let reg = tracing_subscriber::registry() + .with(console_pretty_layer) + .with(console_compact_layer) + .with(forward_layer); - #[cfg(feature = "tokio-console")] - let reg = reg.with(console_subscriber::spawn()); - Arc::new(reg) as Arc - }); + #[cfg(feature = "tokio-console")] + let reg = reg.with(console_subscriber::spawn()); + Arc::new(reg) as Arc + }) + }; Ok(TelemetryInstance::new( tracing_sub, @@ -231,6 +236,9 @@ pub fn telemetry_init(opts: TelemetryOptions) -> Result Result<(), anyhow::Error> { @@ -247,8 +255,23 @@ pub fn telemetry_init_global(opts: TelemetryOptions) -> Result<(), anyhow::Error Ok(()) } -#[cfg(test)] -pub use test_initters::*; +/// WARNING: Calling can cause panics because of https://github.com/tokio-rs/tracing/issues/1656 +/// Lang must not start using until resolved +/// +/// Initialize the fallback global handler. All lang SDKs should call this somewhere, once, at +/// startup, as it initializes a fallback handler for any dependencies (looking at you, otel) that +/// don't provide good ways to customize their tracing usage. It sets a WARN-level global filter +/// that uses the default console logger. +pub fn telemetry_init_fallback() -> Result<(), anyhow::Error> { + telemetry_init_global( + TelemetryOptionsBuilder::default() + .logging(Logger::Console { + filter: construct_filter_string(Level::DEBUG, Level::WARN), + }) + .build()?, + )?; + Ok(()) +} /// A trait for using [Display] on the contents of vecs, etc, which don't implement it. /// @@ -275,24 +298,3 @@ where format!("[{}]", self.iter().format(",")) } } - -/// Helpers for test initialization -#[cfg(test)] -pub mod test_initters { - use super::*; - use temporal_sdk_core_api::telemetry::TelemetryOptionsBuilder; - - /// Turn on logging to the console - #[allow(dead_code)] // Not always used, called to enable for debugging when needed - pub fn test_telem_console() { - telemetry_init_global( - TelemetryOptionsBuilder::default() - .logging(Logger::Console { - filter: construct_filter_string(Level::DEBUG, Level::WARN), - }) - .build() - .unwrap(), - ) - .unwrap(); - } -} diff --git a/core/src/telemetry/otel.rs b/core/src/telemetry/otel.rs index 187ef1449..bd8a29569 100644 --- a/core/src/telemetry/otel.rs +++ b/core/src/telemetry/otel.rs @@ -11,17 +11,16 @@ use super::{ }; use crate::{abstractions::dbg_panic, telemetry::metrics::DEFAULT_S_BUCKETS}; use opentelemetry::{ - self, Key, KeyValue, Value, global, + self, Key, KeyValue, Value, metrics::{Meter, MeterProvider as MeterProviderT}, }; -use opentelemetry_otlp::WithExportConfig; +use opentelemetry_otlp::{WithExportConfig, WithHttpConfig, WithTonicConfig}; use opentelemetry_sdk::{ Resource, metrics::{ - Aggregation, Instrument, InstrumentKind, MeterProviderBuilder, PeriodicReader, - SdkMeterProvider, View, data::Temporality, new_view, reader::TemporalitySelector, + Aggregation, Instrument, InstrumentKind, MeterProviderBuilder, MetricError, PeriodicReader, + SdkMeterProvider, Temporality, View, new_view, }, - runtime, }; use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; use temporal_sdk_core_api::telemetry::{ @@ -35,10 +34,10 @@ use temporal_sdk_core_api::telemetry::{ use tokio::task::AbortHandle; use tonic::{metadata::MetadataMap, transport::ClientTlsConfig}; -fn histo_view( - metric_name: &'static str, - use_seconds: bool, -) -> opentelemetry::metrics::Result> { +/// A specialized `Result` type for metric operations. +type Result = std::result::Result; + +fn histo_view(metric_name: &'static str, use_seconds: bool) -> Result> { let buckets = default_buckets_for(metric_name, use_seconds); new_view( Instrument::new().name(format!("*{metric_name}")), @@ -56,7 +55,7 @@ pub(super) fn augment_meter_provider_with_defaults( global_tags: &HashMap, use_seconds: bool, bucket_overrides: HistogramBucketOverrides, -) -> opentelemetry::metrics::Result { +) -> Result { for (name, buckets) in bucket_overrides.overrides { mpb = mpb.with_view(new_view( Instrument::new().name(format!("*{name}")), @@ -117,31 +116,28 @@ pub(super) fn augment_meter_provider_with_defaults( /// Create an OTel meter that can be used as a [CoreMeter] to export metrics over OTLP. pub fn build_otlp_metric_exporter( opts: OtelCollectorOptions, -) -> Result { - global::set_error_handler(|err| { - tracing::error!("{}", err); - })?; +) -> std::result::Result { let exporter = match opts.protocol { OtlpProtocol::Grpc => { - let mut exporter = opentelemetry_otlp::TonicExporterBuilder::default() + let mut exporter = opentelemetry_otlp::MetricExporter::builder() + .with_tonic() .with_endpoint(opts.url.to_string()); if opts.url.scheme() == "https" || opts.url.scheme() == "grpcs" { exporter = exporter.with_tls_config(ClientTlsConfig::new().with_native_roots()); } exporter .with_metadata(MetadataMap::from_headers((&opts.headers).try_into()?)) - .build_metrics_exporter(Box::new(metric_temporality_to_selector( - opts.metric_temporality, - )))? + .with_temporality(metric_temporality_to_temporality(opts.metric_temporality)) + .build()? } - OtlpProtocol::Http => opentelemetry_otlp::HttpExporterBuilder::default() + OtlpProtocol::Http => opentelemetry_otlp::MetricExporter::builder() + .with_http() .with_endpoint(opts.url.to_string()) .with_headers(opts.headers) - .build_metrics_exporter(Box::new(metric_temporality_to_selector( - opts.metric_temporality, - )))?, + .with_temporality(metric_temporality_to_temporality(opts.metric_temporality)) + .build()?, }; - let reader = PeriodicReader::builder(exporter, runtime::Tokio) + let reader = PeriodicReader::builder(exporter) .with_interval(opts.metric_periodicity) .build(); let mp = augment_meter_provider_with_defaults( @@ -170,7 +166,7 @@ pub struct StartedPromServer { /// Requires a Tokio runtime to exist, and will block briefly while binding the server endpoint. pub fn start_prometheus_metric_exporter( opts: PrometheusExporterOptions, -) -> Result { +) -> std::result::Result { let (srv, exporter) = PromServer::new(&opts)?; let meter_provider = augment_meter_provider_with_defaults( MeterProviderBuilder::default().with_reader(exporter), @@ -228,7 +224,7 @@ impl CoreMeter for CoreOtelMeter { .u64_counter(params.name) .with_unit(params.unit) .with_description(params.description) - .init(), + .build(), ) } @@ -238,7 +234,7 @@ impl CoreMeter for CoreOtelMeter { .u64_histogram(params.name) .with_unit(params.unit) .with_description(params.description) - .init(), + .build(), ) } @@ -248,7 +244,7 @@ impl CoreMeter for CoreOtelMeter { .f64_histogram(params.name) .with_unit(params.unit) .with_description(params.description) - .init(), + .build(), ) } @@ -268,7 +264,7 @@ impl CoreMeter for CoreOtelMeter { .u64_gauge(params.name) .with_unit(params.unit) .with_description(params.description) - .init(), + .build(), ) } @@ -278,7 +274,7 @@ impl CoreMeter for CoreOtelMeter { .f64_gauge(params.name) .with_unit(params.unit) .with_description(params.description) - .init(), + .build(), ) } } @@ -303,41 +299,42 @@ fn default_resource_instance() -> &'static Resource { static INSTANCE: OnceLock = OnceLock::new(); INSTANCE.get_or_init(|| { - let resource = Resource::default(); - if resource.get(Key::from("service.name")) == Some(Value::from("unknown_service")) { + let resource = Resource::builder().build(); + if resource.get(&Key::from("service.name")) == Some(Value::from("unknown_service")) { // otel spec recommends to leave service.name as unknown_service but we want to // maintain backwards compatability with existing library behaviour - return resource.merge(&Resource::new([KeyValue::new( - "service.name", - TELEM_SERVICE_NAME, - )])); + return Resource::builder_empty() + .with_attributes( + resource + .iter() + .map(|(k, v)| KeyValue::new(k.clone(), v.clone())), + ) + .with_attribute(KeyValue::new("service.name", TELEM_SERVICE_NAME)) + .build(); } resource }) } fn default_resource(override_values: &HashMap) -> Resource { - let override_kvs = override_values - .iter() - .map(|(k, v)| KeyValue::new(k.clone(), v.clone())); - default_resource_instance() - .clone() - .merge(&Resource::new(override_kvs)) -} - -#[derive(Clone)] -struct ConstantTemporality(Temporality); - -impl TemporalitySelector for ConstantTemporality { - fn temporality(&self, _: InstrumentKind) -> Temporality { - self.0 - } + Resource::builder_empty() + .with_attributes( + default_resource_instance() + .iter() + .map(|(k, v)| KeyValue::new(k.clone(), v.clone())), + ) + .with_attributes( + override_values + .iter() + .map(|(k, v)| KeyValue::new(k.clone(), v.clone())), + ) + .build() } -fn metric_temporality_to_selector(t: MetricTemporality) -> impl TemporalitySelector + Clone { +fn metric_temporality_to_temporality(t: MetricTemporality) -> Temporality { match t { - MetricTemporality::Cumulative => ConstantTemporality(Temporality::Cumulative), - MetricTemporality::Delta => ConstantTemporality(Temporality::Delta), + MetricTemporality::Cumulative => Temporality::Cumulative, + MetricTemporality::Delta => Temporality::Delta, } } @@ -349,7 +346,7 @@ pub(crate) mod tests { #[test] pub(crate) fn default_resource_instance_service_name_default() { let resource = default_resource_instance(); - let service_name = resource.get(Key::from("service.name")); + let service_name = resource.get(&Key::from("service.name")); assert_eq!(service_name, Some(Value::from(TELEM_SERVICE_NAME))); } } diff --git a/tests/global_metric_tests.rs b/tests/global_metric_tests.rs new file mode 100644 index 000000000..6ccddba61 --- /dev/null +++ b/tests/global_metric_tests.rs @@ -0,0 +1,116 @@ +use parking_lot::Mutex; +use std::{sync::Arc, time::Duration}; +use temporal_sdk_core::{ + CoreRuntime, + telemetry::{build_otlp_metric_exporter, construct_filter_string, telemetry_init_global}, +}; +use temporal_sdk_core_api::telemetry::{ + Logger, OtelCollectorOptionsBuilder, TelemetryOptionsBuilder, metrics::CoreMeter, +}; +use temporal_sdk_core_test_utils::CoreWfStarter; +use tracing::Level; +use tracing_subscriber::fmt::MakeWriter; + +struct CapturingWriter { + buf: Arc>>, +} + +impl MakeWriter<'_> for CapturingWriter { + type Writer = CapturingHandle; + + fn make_writer(&self) -> Self::Writer { + CapturingHandle(self.buf.clone()) + } +} + +struct CapturingHandle(Arc>>); + +impl std::io::Write for CapturingHandle { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let mut b = self.0.lock(); + b.extend_from_slice(buf); + Ok(buf.len()) + } + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +// TODO: This test is not actually run in CI right now because for it to actually work requires +// a number of fixes in upstream libraries: +// +// * It regularly panics because of: https://github.com/tokio-rs/tracing/issues/1656 +// * Otel doesn't appear to actually be logging any warnings/errors on connection failure +// * This whole thing is supposed to show a workaround for https://github.com/open-telemetry/opentelemetry-rust/issues/2697 +#[tokio::test] +async fn otel_errors_logged_as_errors() { + // Set up tracing subscriber to capture ERROR logs + let logs = Arc::new(Mutex::new(Vec::new())); + let writer = CapturingWriter { buf: logs.clone() }; + let subscriber = Arc::new( + tracing_subscriber::fmt() + .with_writer(writer) + .with_env_filter("debug") + .finish(), + ); + let opts = OtelCollectorOptionsBuilder::default() + .url("https://localhost:12345/v1/metrics".parse().unwrap()) // Nothing bound on that port + .build() + .unwrap(); + let exporter = build_otlp_metric_exporter(opts).unwrap(); + + // Global initialization is needed to capture (some) otel logging. + telemetry_init_global( + TelemetryOptionsBuilder::default() + .subscriber_override(subscriber) + .build() + .unwrap(), + ) + .unwrap(); + + let rt = CoreRuntime::new_assume_tokio( + TelemetryOptionsBuilder::default() + .metrics(Arc::new(exporter) as Arc) + // Importantly, _not_ using subscriber override, is using console. + .logging(Logger::Console { + filter: construct_filter_string(Level::INFO, Level::WARN), + }) + .build() + .unwrap(), + ) + .unwrap(); + let mut starter = CoreWfStarter::new_with_runtime("otel_errors_logged_as_errors", rt); + let _worker = starter.get_worker().await; + + tracing::debug!("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ should be in global log"); + + // Wait to allow exporter to attempt sending metrics and fail. + // Windows takes a while to fail the network attempt for some reason so 5s. + tokio::time::sleep(Duration::from_secs(5)).await; + + let logs = logs.lock(); + let log_str = String::from_utf8_lossy(&logs).into_owned(); + drop(logs); + + // The core worker _isn't_ using the fallback, and shouldn't be captured + assert!( + !log_str.contains("Initializing worker"), + "Core logging shouldn't have been caught by fallback" + ); + assert!( + log_str.contains("@@@@@@@@@"), + "Expected fallback log not found in logs: {}", + log_str + ); + // TODO: OTel just doesn't actually log useful errors right now 🤷, see issues at top of test + assert!( + log_str.contains("ERROR"), + "Expected ERROR log not found in logs: {}", + log_str + ); + assert!( + log_str.contains("Metrics exporter otlp failed with the grpc server returns error"), + "Expected an OTel exporter error message in logs: {}", + log_str + ); +} diff --git a/tests/integ_tests/metrics_tests.rs b/tests/integ_tests/metrics_tests.rs index eb06e9fcf..fd6c4ba26 100644 --- a/tests/integ_tests/metrics_tests.rs +++ b/tests/integ_tests/metrics_tests.rs @@ -1,13 +1,7 @@ use crate::integ_tests::mk_nexus_endpoint; use anyhow::anyhow; use assert_matches::assert_matches; -use std::{ - collections::HashMap, - env, - string::ToString, - sync::{Arc, Mutex}, - time::Duration, -}; +use std::{collections::HashMap, env, string::ToString, sync::Arc, time::Duration}; use temporal_client::{ REQUEST_LATENCY_HISTOGRAM_NAME, WorkflowClientTrait, WorkflowOptions, WorkflowService, }; @@ -60,7 +54,6 @@ use temporal_sdk_core_test_utils::{ get_integ_server_options, get_integ_telem_options, prom_metrics, }; use tokio::{join, sync::Barrier}; -use tracing_subscriber::fmt::MakeWriter; use url::Url; pub(crate) async fn get_text(endpoint: String) -> String { @@ -1111,70 +1104,3 @@ async fn evict_on_complete_does_not_count_as_forced_eviction() { // Metric shouldn't show up at all, since it's zero the whole time. assert!(!body.contains("temporal_sticky_cache_total_forced_eviction")); } - -struct CapturingWriter { - buf: Arc>>, -} - -impl MakeWriter<'_> for CapturingWriter { - type Writer = CapturingHandle; - - fn make_writer(&self) -> Self::Writer { - CapturingHandle(self.buf.clone()) - } -} - -struct CapturingHandle(Arc>>); - -impl std::io::Write for CapturingHandle { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - let mut b = self.0.lock().unwrap(); - b.extend_from_slice(buf); - Ok(buf.len()) - } - fn flush(&mut self) -> std::io::Result<()> { - Ok(()) - } -} - -#[tokio::test] -async fn otel_errors_logged_as_errors() { - // Set up tracing subscriber to capture ERROR logs - let logs = Arc::new(Mutex::new(Vec::new())); - let writer = CapturingWriter { buf: logs.clone() }; - let subscriber = tracing_subscriber::fmt().with_writer(writer).finish(); - let _guard = tracing::subscriber::set_default(subscriber); - - let opts = OtelCollectorOptionsBuilder::default() - .url("https://localhost:12345/v1/metrics".parse().unwrap()) // Nothing bound on that port - .build() - .unwrap(); - let exporter = build_otlp_metric_exporter(opts).unwrap(); - - let telemopts = TelemetryOptionsBuilder::default() - .metrics(Arc::new(exporter) as Arc) - .build() - .unwrap(); - - let rt = CoreRuntime::new_assume_tokio(telemopts).unwrap(); - let mut starter = CoreWfStarter::new_with_runtime("otel_errors_logged_as_errors", rt); - let _worker = starter.get_worker().await; - - // Wait to allow exporter to attempt sending metrics and fail. - // Windows takes a while to fail the network attempt for some reason so 5s. - tokio::time::sleep(Duration::from_secs(5)).await; - - let logs = logs.lock().unwrap(); - let log_str = String::from_utf8_lossy(&logs); - - assert!( - log_str.contains("ERROR"), - "Expected ERROR log not found in logs: {}", - log_str - ); - assert!( - log_str.contains("Metrics exporter otlp failed with the grpc server returns error"), - "Expected an OTel exporter error message in logs: {}", - log_str - ); -}