Skip to content

Commit 293120b

Browse files
committed
Merge branch 'tracer-provider-drop-shutdown-check' of github.com:lalitb/opentelemetry-rust into tracer-provider-drop-shutdown-check
2 parents 2279f3a + 01234c5 commit 293120b

File tree

15 files changed

+135
-225
lines changed

15 files changed

+135
-225
lines changed

opentelemetry-otlp/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@
100100
//! * `grpc-tonic`: Use `tonic` as grpc layer. This is enabled by default.
101101
//! * `gzip-tonic`: Use gzip compression for `tonic` grpc layer.
102102
//! * `zstd-tonic`: Use zstd compression for `tonic` grpc layer.
103-
//! * `tls-tonic`: Enable TLS.
104103
//! * `tls-roots`: Adds system trust roots to rustls-based gRPC clients using the rustls-native-certs crate
105104
//! * `tls-webkpi-roots`: Embeds Mozilla's trust roots to rustls-based gRPC clients using the webkpi-roots crate
106105
//!

opentelemetry-sdk/src/logs/log_emitter.rs

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
use super::{BatchLogProcessor, LogProcessor, LogRecord, SimpleLogProcessor, TraceContext};
22
use crate::{export::logs::LogExporter, runtime::RuntimeChannel, Resource};
3-
use opentelemetry::otel_warn;
43
use opentelemetry::{
5-
global,
64
logs::{LogError, LogResult},
5+
otel_debug,
76
trace::TraceContextExt,
87
Context, InstrumentationLibrary,
98
};
@@ -126,17 +125,10 @@ impl LoggerProvider {
126125
if errs.is_empty() {
127126
Ok(())
128127
} else {
129-
otel_warn!(
130-
name: "logger_provider_shutdown_error",
131-
error = format!("{:?}", errs)
132-
);
133-
Err(LogError::Other(format!("{:?}", errs).into()))
128+
Err(LogError::Other(format!("{errs:?}").into()))
134129
}
135130
} else {
136-
otel_warn!(
137-
name: "logger_provider_already_shutdown"
138-
);
139-
Err(LogError::Other("logger provider already shut down".into()))
131+
Err(LogError::AlreadyShutdown("LoggerProvider".to_string()))
140132
}
141133
}
142134
}
@@ -154,6 +146,24 @@ impl LoggerProviderInner {
154146
let mut errs = vec![];
155147
for processor in &self.processors {
156148
if let Err(err) = processor.shutdown() {
149+
// Log at debug level because:
150+
// - The error is also returned to the user for handling (if applicable)
151+
// - Or the error occurs during `LoggerProviderInner::Drop` as part of telemetry shutdown,
152+
// which is non-actionable by the user
153+
match err {
154+
// specific handling for mutex poisioning
155+
LogError::MutexPoisoned(_) => {
156+
otel_debug!(
157+
name: "LoggerProvider.Drop.ShutdownMutexPoisoned",
158+
);
159+
}
160+
_ => {
161+
otel_debug!(
162+
name: "LoggerProvider.Drop.ShutdownError",
163+
error = format!("{err}")
164+
);
165+
}
166+
}
157167
errs.push(err);
158168
}
159169
}
@@ -164,10 +174,11 @@ impl LoggerProviderInner {
164174
impl Drop for LoggerProviderInner {
165175
fn drop(&mut self) {
166176
if !self.is_shutdown.load(Ordering::Relaxed) {
167-
let errs = self.shutdown();
168-
if !errs.is_empty() {
169-
global::handle_error(LogError::Other(format!("{:?}", errs).into()));
170-
}
177+
let _ = self.shutdown(); // errors are handled within shutdown
178+
} else {
179+
otel_debug!(
180+
name: "LoggerProvider.Drop.AlreadyShutdown"
181+
);
171182
}
172183
}
173184
}

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 34 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,8 @@ use futures_util::{
1212
#[cfg(feature = "logs_level_enabled")]
1313
use opentelemetry::logs::Severity;
1414
use opentelemetry::{
15-
global,
1615
logs::{LogError, LogResult},
17-
otel_error, otel_warn, InstrumentationLibrary,
16+
otel_debug, otel_error, otel_warn, InstrumentationLibrary,
1817
};
1918

2019
use std::sync::atomic::AtomicBool;
@@ -99,26 +98,36 @@ impl LogProcessor for SimpleLogProcessor {
9998
fn emit(&self, record: &mut LogRecord, instrumentation: &InstrumentationLibrary) {
10099
// noop after shutdown
101100
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
101+
// this is a warning, as the user is trying to log after the processor has been shutdown
102102
otel_warn!(
103-
name: "simple_log_processor_emit_after_shutdown"
103+
name: "SimpleLogProcessor.Emit.ProcessorShutdown",
104104
);
105105
return;
106106
}
107107

108108
let result = self
109109
.exporter
110110
.lock()
111-
.map_err(|_| LogError::Other("simple logprocessor mutex poison".into()))
111+
.map_err(|_| LogError::MutexPoisoned("SimpleLogProcessor".into()))
112112
.and_then(|mut exporter| {
113113
let log_tuple = &[(record as &LogRecord, instrumentation)];
114114
futures_executor::block_on(exporter.export(LogBatch::new(log_tuple)))
115115
});
116-
if let Err(err) = result {
117-
otel_error!(
118-
name: "simple_log_processor_emit_error",
119-
error = format!("{:?}", err)
120-
);
121-
global::handle_error(err);
116+
// Handle errors with specific static names
117+
match result {
118+
Err(LogError::MutexPoisoned(_)) => {
119+
// logging as debug as this is not a user error
120+
otel_debug!(
121+
name: "SimpleLogProcessor.Emit.MutexPoisoning",
122+
);
123+
}
124+
Err(err) => {
125+
otel_error!(
126+
name: "SimpleLogProcessor.Emit.ExportError",
127+
error = format!("{}",err)
128+
);
129+
}
130+
_ => {}
122131
}
123132
}
124133

@@ -133,12 +142,7 @@ impl LogProcessor for SimpleLogProcessor {
133142
exporter.shutdown();
134143
Ok(())
135144
} else {
136-
otel_error!(
137-
name: "simple_log_processor_shutdown_error"
138-
);
139-
Err(LogError::Other(
140-
"simple logprocessor mutex poison during shutdown".into(),
141-
))
145+
Err(LogError::MutexPoisoned("SimpleLogProcessor".into()))
142146
}
143147
}
144148

@@ -170,12 +174,12 @@ impl<R: RuntimeChannel> LogProcessor for BatchLogProcessor<R> {
170174
instrumentation.clone(),
171175
)));
172176

177+
// TODO - Implement throttling to prevent error flooding when the queue is full or closed.
173178
if let Err(err) = result {
174179
otel_error!(
175-
name: "batch_log_processor_emit_error",
176-
error = format!("{:?}", err)
180+
name: "BatchLogProcessor.Export.Error",
181+
error = format!("{}", err)
177182
);
178-
global::handle_error(LogError::Other(err.into()));
179183
}
180184
}
181185

@@ -243,10 +247,9 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
243247

244248
if let Err(err) = result {
245249
otel_error!(
246-
name: "batch_log_processor_export_error",
247-
error = format!("{:?}", err)
250+
name: "BatchLogProcessor.Export.Error",
251+
error = format!("{}", err)
248252
);
249-
global::handle_error(err);
250253
}
251254
}
252255
}
@@ -261,24 +264,12 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
261264
.await;
262265

263266
if let Some(channel) = res_channel {
264-
if let Err(result) = channel.send(result) {
265-
global::handle_error(LogError::from(format!(
266-
"failed to send flush result: {:?}",
267-
result
268-
)));
269-
otel_error!(
270-
name: "batch_log_processor_flush_error",
271-
error = format!("{:?}", result),
272-
message = "Failed to send flush result"
267+
if let Err(send_error) = channel.send(result) {
268+
otel_debug!(
269+
name: "BatchLogProcessor.Flush.SendResultError",
270+
error = format!("{:?}", send_error),
273271
);
274272
}
275-
} else if let Err(err) = result {
276-
otel_error!(
277-
name: "batch_log_processor_flush_error",
278-
error = format!("{:?}", err),
279-
message = "Flush failed"
280-
);
281-
global::handle_error(err);
282273
}
283274
}
284275
// Stream has terminated or processor is shutdown, return to finish execution.
@@ -293,21 +284,14 @@ impl<R: RuntimeChannel> BatchLogProcessor<R> {
293284

294285
exporter.shutdown();
295286

296-
if let Err(result) = ch.send(result) {
297-
otel_error!(
298-
name: "batch_log_processor_shutdown_error",
299-
error = format!("{:?}", result),
300-
message = "Failed to send shutdown result"
287+
if let Err(send_error) = ch.send(result) {
288+
otel_debug!(
289+
name: "BatchLogProcessor.Shutdown.SendResultError",
290+
error = format!("{:?}", send_error),
301291
);
302-
global::handle_error(LogError::from(format!(
303-
"failed to send batch processor shutdown result: {:?}",
304-
result
305-
)));
306292
}
307-
308293
break;
309294
}
310-
311295
// propagate the resource
312296
BatchMessage::SetResource(resource) => {
313297
exporter.set_resource(&resource);
@@ -357,13 +341,7 @@ where
357341
pin_mut!(timeout);
358342
match future::select(export, timeout).await {
359343
Either::Left((export_res, _)) => export_res,
360-
Either::Right((_, _)) => {
361-
otel_error!(
362-
name: "export_with_timeout_timeout",
363-
timeout_duration = time_out.as_millis()
364-
);
365-
ExportResult::Err(LogError::ExportTimedOut(time_out))
366-
}
344+
Either::Right((_, _)) => ExportResult::Err(LogError::ExportTimedOut(time_out)),
367345
}
368346
}
369347

opentelemetry-sdk/src/metrics/meter.rs

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use crate::metrics::{
1717
pipeline::{Pipelines, Resolver},
1818
};
1919

20-
use super::noop::{NoopAsyncInstrument, NoopSyncInstrument};
20+
use super::noop::NoopSyncInstrument;
2121

2222
// maximum length of instrument name
2323
const INSTRUMENT_NAME_MAX_LENGTH: usize = 255;
@@ -108,7 +108,7 @@ impl SdkMeter {
108108
let validation_result = validate_instrument_config(builder.name.as_ref(), &builder.unit);
109109
if let Err(err) = validation_result {
110110
global::handle_error(err);
111-
return Ok(ObservableCounter::new(Arc::new(NoopAsyncInstrument::new())));
111+
return Ok(ObservableCounter::new());
112112
}
113113

114114
let ms = resolver.measures(
@@ -120,7 +120,7 @@ impl SdkMeter {
120120
)?;
121121

122122
if ms.is_empty() {
123-
return Ok(ObservableCounter::new(Arc::new(NoopAsyncInstrument::new())));
123+
return Ok(ObservableCounter::new());
124124
}
125125

126126
let observable = Arc::new(Observable::new(ms));
@@ -131,7 +131,7 @@ impl SdkMeter {
131131
.register_callback(move || callback(cb_inst.as_ref()));
132132
}
133133

134-
Ok(ObservableCounter::new(observable))
134+
Ok(ObservableCounter::new())
135135
}
136136

137137
fn create_observable_updown_counter<T>(
@@ -145,9 +145,7 @@ impl SdkMeter {
145145
let validation_result = validate_instrument_config(builder.name.as_ref(), &builder.unit);
146146
if let Err(err) = validation_result {
147147
global::handle_error(err);
148-
return Ok(ObservableUpDownCounter::new(Arc::new(
149-
NoopAsyncInstrument::new(),
150-
)));
148+
return Ok(ObservableUpDownCounter::new());
151149
}
152150

153151
let ms = resolver.measures(
@@ -159,9 +157,7 @@ impl SdkMeter {
159157
)?;
160158

161159
if ms.is_empty() {
162-
return Ok(ObservableUpDownCounter::new(Arc::new(
163-
NoopAsyncInstrument::new(),
164-
)));
160+
return Ok(ObservableUpDownCounter::new());
165161
}
166162

167163
let observable = Arc::new(Observable::new(ms));
@@ -172,7 +168,7 @@ impl SdkMeter {
172168
.register_callback(move || callback(cb_inst.as_ref()));
173169
}
174170

175-
Ok(ObservableUpDownCounter::new(observable))
171+
Ok(ObservableUpDownCounter::new())
176172
}
177173

178174
fn create_observable_gauge<T>(
@@ -186,7 +182,7 @@ impl SdkMeter {
186182
let validation_result = validate_instrument_config(builder.name.as_ref(), &builder.unit);
187183
if let Err(err) = validation_result {
188184
global::handle_error(err);
189-
return Ok(ObservableGauge::new(Arc::new(NoopAsyncInstrument::new())));
185+
return Ok(ObservableGauge::new());
190186
}
191187

192188
let ms = resolver.measures(
@@ -198,7 +194,7 @@ impl SdkMeter {
198194
)?;
199195

200196
if ms.is_empty() {
201-
return Ok(ObservableGauge::new(Arc::new(NoopAsyncInstrument::new())));
197+
return Ok(ObservableGauge::new());
202198
}
203199

204200
let observable = Arc::new(Observable::new(ms));
@@ -209,7 +205,7 @@ impl SdkMeter {
209205
.register_callback(move || callback(cb_inst.as_ref()));
210206
}
211207

212-
Ok(ObservableGauge::new(observable))
208+
Ok(ObservableGauge::new())
213209
}
214210

215211
fn create_updown_counter<T>(

opentelemetry-sdk/src/metrics/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ fn calculate_hash(values: &[KeyValue]) -> u64 {
104104

105105
impl AttributeSet {
106106
fn new(mut values: Vec<KeyValue>) -> Self {
107-
values.sort_unstable();
107+
values.sort_unstable_by(|a, b| a.key.cmp(&b.key));
108108
let hash = calculate_hash(&values);
109109
AttributeSet(values, hash)
110110
}

opentelemetry-sdk/src/metrics/noop.rs

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use opentelemetry::{
2-
metrics::{AsyncInstrument, InstrumentProvider, SyncInstrument},
2+
metrics::{InstrumentProvider, SyncInstrument},
33
KeyValue,
44
};
55

@@ -36,22 +36,3 @@ impl<T> SyncInstrument<T> for NoopSyncInstrument {
3636
// Ignored
3737
}
3838
}
39-
40-
/// A no-op async instrument.
41-
#[derive(Debug, Default)]
42-
pub(crate) struct NoopAsyncInstrument {
43-
_private: (),
44-
}
45-
46-
impl NoopAsyncInstrument {
47-
/// Create a new no-op async instrument
48-
pub(crate) fn new() -> Self {
49-
NoopAsyncInstrument { _private: () }
50-
}
51-
}
52-
53-
impl<T> AsyncInstrument<T> for NoopAsyncInstrument {
54-
fn observe(&self, _value: T, _attributes: &[KeyValue]) {
55-
// Ignored
56-
}
57-
}

opentelemetry/CHANGELOG.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44

55
- Bump MSRV to 1.70 [#2179](https://github.com/open-telemetry/opentelemetry-rust/pull/2179)
66
- Add `LogRecord::set_trace_context`; an optional method conditional on the `trace` feature for setting trace context on a log record.
7-
- Removed unnecessary public methods named `as_any` from `AsyncInstrument` trait and the implementing instruments: `ObservableCounter`, `ObservableGauge`, and `ObservableUpDownCounter` [#2187](https://github.com/open-telemetry/opentelemetry-rust/issues/2187)
8-
- Introduced `SyncInstrument` trait to replace the individual synchronous instrument traits (`SyncCounter`, `SyncGauge`, `SyncHistogram`, `SyncUpDownCounter`) which are meant for SDK implementation. [#2207](https://github.com/open-telemetry/opentelemetry-rust/issues/2207)
7+
- Removed unnecessary public methods named `as_any` from `AsyncInstrument` trait and the implementing instruments: `ObservableCounter`, `ObservableGauge`, and `ObservableUpDownCounter` [#2187](https://github.com/open-telemetry/opentelemetry-rust/pull/2187)
8+
- Introduced `SyncInstrument` trait to replace the individual synchronous instrument traits (`SyncCounter`, `SyncGauge`, `SyncHistogram`, `SyncUpDownCounter`) which are meant for SDK implementation. [#2207](https://github.com/open-telemetry/opentelemetry-rust/pull/2207)
9+
- Ensured that `observe` method on asynchronous instruments can only be called inside a callback. This was done by removing the implementation of `AsyncInstrument` trait for each of the asynchronous instruments. [#2210](https://github.com/open-telemetry/opentelemetry-rust/pull/2210)
10+
- Removed `PartialOrd` and `Ord` implementations for `KeyValue`. [#2215](https://github.com/open-telemetry/opentelemetry-rust/pull/2215)
911

1012
## v0.26.0
1113
Released 2024-Sep-30

0 commit comments

Comments
 (0)