Skip to content

Commit 0c0ae61

Browse files
committed
Add (currently unusuable) fallback handler initializer
1 parent 1044aee commit 0c0ae61

File tree

5 files changed

+234
-78
lines changed

5 files changed

+234
-78
lines changed

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,12 @@ You can run it using:
6464

6565
## Debugging
6666

67-
The crate uses [tracing](https://github.com/tokio-rs/tracing) to help with debugging. To enable
68-
it for a test, insert the below snippet at the start of the test. By default, tracing data is output
69-
to stdout in a (reasonably) pretty manner.
67+
The crate uses [tracing](https://github.com/tokio-rs/tracing) to help with debugging. To enable it
68+
globally for tests, insert the below snippet at the start of the test. By default, tracing data is
69+
output to stdout in a (reasonably) pretty manner.
7070

7171
```rust
72-
crate::telemetry::test_telem_console();
72+
crate::telemetry::telemetry_init_fallback();
7373
```
7474

7575
The passed in options to initialization can be customized to export to an OTel collector, etc.

core-api/src/telemetry.rs

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ pub mod metrics;
33
use crate::telemetry::metrics::CoreMeter;
44
use std::{
55
collections::HashMap,
6-
fmt::Debug,
6+
fmt::{Debug, Formatter},
77
net::SocketAddr,
88
sync::Arc,
99
time::{Duration, SystemTime, UNIX_EPOCH},
1010
};
11-
use tracing_core::Level;
11+
use tracing_core::{Level, Subscriber};
1212
use url::Url;
1313

1414
pub static METRIC_PREFIX: &str = "temporal_";
@@ -27,7 +27,7 @@ pub trait CoreTelemetry {
2727
}
2828

2929
/// Telemetry configuration options. Construct with [TelemetryOptionsBuilder]
30-
#[derive(Debug, Clone, derive_builder::Builder)]
30+
#[derive(Clone, derive_builder::Builder)]
3131
#[non_exhaustive]
3232
pub struct TelemetryOptions {
3333
/// Optional logger - set as None to disable.
@@ -45,6 +45,39 @@ pub struct TelemetryOptions {
4545
/// A prefix to be applied to all core-created metrics. Defaults to "temporal_".
4646
#[builder(default = "METRIC_PREFIX.to_string()")]
4747
pub metric_prefix: String,
48+
/// If provided, logging config will be ignored and this explicit subscriber will be used for
49+
/// all logging and traces.
50+
#[builder(setter(strip_option), default)]
51+
pub subscriber_override: Option<Arc<dyn Subscriber + Send + Sync>>,
52+
}
53+
impl Debug for TelemetryOptions {
54+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
55+
#[derive(Debug)]
56+
#[allow(dead_code)]
57+
struct TelemetryOptions<'a> {
58+
logging: &'a Option<Logger>,
59+
metrics: &'a Option<Arc<dyn CoreMeter>>,
60+
attach_service_name: &'a bool,
61+
metric_prefix: &'a str,
62+
}
63+
let Self {
64+
logging,
65+
metrics,
66+
attach_service_name,
67+
metric_prefix,
68+
..
69+
} = self;
70+
71+
Debug::fmt(
72+
&TelemetryOptions {
73+
logging,
74+
metrics,
75+
attach_service_name,
76+
metric_prefix,
77+
},
78+
f,
79+
)
80+
}
4881
}
4982

5083
/// Options for exporting to an OpenTelemetry Collector

core/Cargo.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ lru = "0.13"
4444
mockall = "0.13"
4545
opentelemetry = { workspace = true, features = ["metrics"], optional = true }
4646
opentelemetry_sdk = { version = "0.29", features = ["rt-tokio", "metrics", "spec_unstable_metrics_views"], optional = true }
47-
opentelemetry-otlp = { version = "0.29", features = ["tokio", "metrics", "tls", "http-proto", "reqwest-client", "grpc-tonic" ], optional = true }
47+
opentelemetry-otlp = { version = "0.29", features = ["tokio", "metrics", "tls", "http-proto", "reqwest-client", "grpc-tonic"], optional = true }
4848
opentelemetry-prometheus = { version = "0.29", optional = true }
4949
parking_lot = { version = "0.12", features = ["send_guard"] }
5050
pid = "4.0"
@@ -115,6 +115,11 @@ name = "manual_tests"
115115
path = "../tests/manual_tests.rs"
116116
test = false
117117

118+
[[test]]
119+
name = "global_metric_tests"
120+
path = "../tests/global_metric_tests.rs"
121+
test = false
122+
118123
[[bench]]
119124
name = "workflow_replay"
120125
harness = false

core/src/telemetry/mod.rs

Lines changed: 72 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use std::{
3333
},
3434
};
3535
use temporal_sdk_core_api::telemetry::{
36-
CoreLog, CoreTelemetry, Logger, TelemetryOptions,
36+
CoreLog, CoreTelemetry, Logger, TelemetryOptions, TelemetryOptionsBuilder,
3737
metrics::{CoreMeter, MetricKeyValue, NewAttributes, TemporalMeter},
3838
};
3939
use tracing::{Level, Subscriber};
@@ -173,54 +173,59 @@ pub fn telemetry_init(opts: TelemetryOptions) -> Result<TelemetryInstance, anyho
173173
let mut forward_layer = None;
174174
// ===================================
175175

176-
let tracing_sub = opts.logging.map(|logger| {
177-
match logger {
178-
Logger::Console { filter } => {
179-
// This is silly dupe but can't be avoided without boxing.
180-
if env::var("TEMPORAL_CORE_PRETTY_LOGS").is_ok() {
181-
console_pretty_layer = Some(
182-
tracing_subscriber::fmt::layer()
183-
.with_target(false)
184-
.event_format(
185-
tracing_subscriber::fmt::format()
186-
.pretty()
187-
.with_source_location(false),
188-
)
189-
.with_filter(EnvFilter::new(filter)),
190-
)
191-
} else {
192-
console_compact_layer = Some(
193-
tracing_subscriber::fmt::layer()
194-
.with_target(false)
195-
.event_format(
196-
tracing_subscriber::fmt::format()
197-
.compact()
198-
.with_source_location(false),
199-
)
200-
.with_filter(EnvFilter::new(filter)),
201-
)
176+
let tracing_sub = if let Some(ts) = opts.subscriber_override {
177+
Some(ts)
178+
} else {
179+
opts.logging.map(|logger| {
180+
match logger {
181+
Logger::Console { filter } => {
182+
// This is silly dupe but can't be avoided without boxing.
183+
if env::var("TEMPORAL_CORE_PRETTY_LOGS").is_ok() {
184+
console_pretty_layer = Some(
185+
tracing_subscriber::fmt::layer()
186+
.with_target(false)
187+
.event_format(
188+
tracing_subscriber::fmt::format()
189+
.pretty()
190+
.with_source_location(false),
191+
)
192+
.with_filter(EnvFilter::new(filter)),
193+
)
194+
} else {
195+
console_compact_layer = Some(
196+
tracing_subscriber::fmt::layer()
197+
.with_target(false)
198+
.event_format(
199+
tracing_subscriber::fmt::format()
200+
.compact()
201+
.with_source_location(false),
202+
)
203+
.with_filter(EnvFilter::new(filter)),
204+
)
205+
}
202206
}
203-
}
204-
Logger::Forward { filter } => {
205-
let (export_layer, lo) =
206-
CoreLogConsumerLayer::new_buffered(FORWARD_LOG_BUFFER_SIZE);
207-
logs_out = Some(Mutex::new(lo));
208-
forward_layer = Some(export_layer.with_filter(EnvFilter::new(filter)));
209-
}
210-
Logger::Push { filter, consumer } => {
211-
forward_layer =
212-
Some(CoreLogConsumerLayer::new(consumer).with_filter(EnvFilter::new(filter)));
213-
}
214-
};
215-
let reg = tracing_subscriber::registry()
216-
.with(console_pretty_layer)
217-
.with(console_compact_layer)
218-
.with(forward_layer);
207+
Logger::Forward { filter } => {
208+
let (export_layer, lo) =
209+
CoreLogConsumerLayer::new_buffered(FORWARD_LOG_BUFFER_SIZE);
210+
logs_out = Some(Mutex::new(lo));
211+
forward_layer = Some(export_layer.with_filter(EnvFilter::new(filter)));
212+
}
213+
Logger::Push { filter, consumer } => {
214+
forward_layer = Some(
215+
CoreLogConsumerLayer::new(consumer).with_filter(EnvFilter::new(filter)),
216+
);
217+
}
218+
};
219+
let reg = tracing_subscriber::registry()
220+
.with(console_pretty_layer)
221+
.with(console_compact_layer)
222+
.with(forward_layer);
219223

220-
#[cfg(feature = "tokio-console")]
221-
let reg = reg.with(console_subscriber::spawn());
222-
Arc::new(reg) as Arc<dyn Subscriber + Send + Sync>
223-
});
224+
#[cfg(feature = "tokio-console")]
225+
let reg = reg.with(console_subscriber::spawn());
226+
Arc::new(reg) as Arc<dyn Subscriber + Send + Sync>
227+
})
228+
};
224229

225230
Ok(TelemetryInstance::new(
226231
tracing_sub,
@@ -231,6 +236,9 @@ pub fn telemetry_init(opts: TelemetryOptions) -> Result<TelemetryInstance, anyho
231236
))
232237
}
233238

239+
/// WARNING: Calling can cause panics because of https://github.com/tokio-rs/tracing/issues/1656
240+
/// Lang must not start using until resolved
241+
///
234242
/// Initialize telemetry/tracing globally. Useful for testing. Only takes affect when called
235243
/// the first time. Subsequent calls are ignored.
236244
pub fn telemetry_init_global(opts: TelemetryOptions) -> Result<(), anyhow::Error> {
@@ -247,8 +255,23 @@ pub fn telemetry_init_global(opts: TelemetryOptions) -> Result<(), anyhow::Error
247255
Ok(())
248256
}
249257

250-
#[cfg(test)]
251-
pub use test_initters::*;
258+
/// WARNING: Calling can cause panics because of https://github.com/tokio-rs/tracing/issues/1656
259+
/// Lang must not start using until resolved
260+
///
261+
/// Initialize the fallback global handler. All lang SDKs should call this somewhere, once, at
262+
/// startup, as it initializes a fallback handler for any dependencies (looking at you, otel) that
263+
/// don't provide good ways to customize their tracing usage. It sets a WARN-level global filter
264+
/// that uses the default console logger.
265+
pub fn telemetry_init_fallback() -> Result<(), anyhow::Error> {
266+
telemetry_init_global(
267+
TelemetryOptionsBuilder::default()
268+
.logging(Logger::Console {
269+
filter: construct_filter_string(Level::DEBUG, Level::WARN),
270+
})
271+
.build()?,
272+
)?;
273+
Ok(())
274+
}
252275

253276
/// A trait for using [Display] on the contents of vecs, etc, which don't implement it.
254277
///
@@ -275,24 +298,3 @@ where
275298
format!("[{}]", self.iter().format(","))
276299
}
277300
}
278-
279-
/// Helpers for test initialization
280-
#[cfg(test)]
281-
pub mod test_initters {
282-
use super::*;
283-
use temporal_sdk_core_api::telemetry::TelemetryOptionsBuilder;
284-
285-
/// Turn on logging to the console
286-
#[allow(dead_code)] // Not always used, called to enable for debugging when needed
287-
pub fn test_telem_console() {
288-
telemetry_init_global(
289-
TelemetryOptionsBuilder::default()
290-
.logging(Logger::Console {
291-
filter: construct_filter_string(Level::DEBUG, Level::WARN),
292-
})
293-
.build()
294-
.unwrap(),
295-
)
296-
.unwrap();
297-
}
298-
}

tests/global_metric_tests.rs

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
use parking_lot::Mutex;
2+
use std::{sync::Arc, time::Duration};
3+
use temporal_sdk_core::{
4+
CoreRuntime,
5+
telemetry::{build_otlp_metric_exporter, construct_filter_string, telemetry_init_global},
6+
};
7+
use temporal_sdk_core_api::telemetry::{
8+
Logger, OtelCollectorOptionsBuilder, TelemetryOptionsBuilder, metrics::CoreMeter,
9+
};
10+
use temporal_sdk_core_test_utils::CoreWfStarter;
11+
use tracing::Level;
12+
use tracing_subscriber::fmt::MakeWriter;
13+
14+
struct CapturingWriter {
15+
buf: Arc<Mutex<Vec<u8>>>,
16+
}
17+
18+
impl MakeWriter<'_> for CapturingWriter {
19+
type Writer = CapturingHandle;
20+
21+
fn make_writer(&self) -> Self::Writer {
22+
CapturingHandle(self.buf.clone())
23+
}
24+
}
25+
26+
struct CapturingHandle(Arc<Mutex<Vec<u8>>>);
27+
28+
impl std::io::Write for CapturingHandle {
29+
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
30+
let mut b = self.0.lock();
31+
b.extend_from_slice(buf);
32+
Ok(buf.len())
33+
}
34+
fn flush(&mut self) -> std::io::Result<()> {
35+
Ok(())
36+
}
37+
}
38+
39+
// TODO: This test is not actually run in CI right now because for it to actually work requires
40+
// a number of fixes in upstream libraries:
41+
//
42+
// * It regularly panics because of: https://github.com/tokio-rs/tracing/issues/1656
43+
// * Otel doesn't appear to actually be logging any warnings/errors on connection failure
44+
// * This whole thing is supposed to show a workaround for https://github.com/open-telemetry/opentelemetry-rust/issues/2697
45+
#[tokio::test]
46+
async fn otel_errors_logged_as_errors() {
47+
// Set up tracing subscriber to capture ERROR logs
48+
let logs = Arc::new(Mutex::new(Vec::new()));
49+
let writer = CapturingWriter { buf: logs.clone() };
50+
let subscriber = Arc::new(
51+
tracing_subscriber::fmt()
52+
.with_writer(writer)
53+
.with_env_filter("debug")
54+
.finish(),
55+
);
56+
let opts = OtelCollectorOptionsBuilder::default()
57+
.url("https://localhost:12345/v1/metrics".parse().unwrap()) // Nothing bound on that port
58+
.build()
59+
.unwrap();
60+
let exporter = build_otlp_metric_exporter(opts).unwrap();
61+
62+
// Global initialization is needed to capture (some) otel logging.
63+
telemetry_init_global(
64+
TelemetryOptionsBuilder::default()
65+
.subscriber_override(subscriber)
66+
.build()
67+
.unwrap(),
68+
)
69+
.unwrap();
70+
71+
let rt = CoreRuntime::new_assume_tokio(
72+
TelemetryOptionsBuilder::default()
73+
.metrics(Arc::new(exporter) as Arc<dyn CoreMeter>)
74+
// Importantly, _not_ using subscriber override, is using console.
75+
.logging(Logger::Console {
76+
filter: construct_filter_string(Level::INFO, Level::WARN),
77+
})
78+
.build()
79+
.unwrap(),
80+
)
81+
.unwrap();
82+
let mut starter = CoreWfStarter::new_with_runtime("otel_errors_logged_as_errors", rt);
83+
let _worker = starter.get_worker().await;
84+
85+
tracing::debug!("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ should be in global log");
86+
87+
// Wait to allow exporter to attempt sending metrics and fail.
88+
// Windows takes a while to fail the network attempt for some reason so 5s.
89+
tokio::time::sleep(Duration::from_secs(5)).await;
90+
91+
let logs = logs.lock();
92+
let log_str = String::from_utf8_lossy(&logs).into_owned();
93+
drop(logs);
94+
95+
// The core worker _isn't_ using the fallback, and shouldn't be captured
96+
assert!(
97+
!log_str.contains("Initializing worker"),
98+
"Core logging shouldn't have been caught by fallback"
99+
);
100+
assert!(
101+
log_str.contains("@@@@@@@@@"),
102+
"Expected fallback log not found in logs: {}",
103+
log_str
104+
);
105+
// TODO: OTel just doesn't actually log useful errors right now 🤷, see issues at top of test
106+
assert!(
107+
log_str.contains("ERROR"),
108+
"Expected ERROR log not found in logs: {}",
109+
log_str
110+
);
111+
assert!(
112+
log_str.contains("Metrics exporter otlp failed with the grpc server returns error"),
113+
"Expected an OTel exporter error message in logs: {}",
114+
log_str
115+
);
116+
}

0 commit comments

Comments
 (0)