Skip to content

Commit 5fb8038

Browse files
committed
Remove requiring mut on shutdown and resource on Log
1 parent d568100 commit 5fb8038

File tree

8 files changed

+50
-76
lines changed

8 files changed

+50
-76
lines changed

opentelemetry-otlp/src/exporter/http/logs.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ impl LogExporter for OtlpHttpClient {
5252
}
5353
}
5454

55-
fn shutdown(&mut self) -> OTelSdkResult {
55+
fn shutdown(&self) -> OTelSdkResult {
5656
let mut client_guard = self.client.lock().map_err(|e| {
5757
OTelSdkError::InternalFailure(format!("Failed to acquire client lock: {}", e))
5858
})?;
@@ -64,7 +64,7 @@ impl LogExporter for OtlpHttpClient {
6464
Ok(())
6565
}
6666

67-
fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
67+
fn set_resource(&self, resource: &opentelemetry_sdk::Resource) {
6868
self.resource = resource.into();
6969
}
7070
}

opentelemetry-sdk/src/logs/export.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,15 +136,15 @@ pub trait LogExporter: Send + Sync + Debug {
136136
) -> impl std::future::Future<Output = OTelSdkResult> + Send;
137137

138138
/// Shuts down the exporter.
139-
fn shutdown(&mut self) -> OTelSdkResult {
139+
fn shutdown(&self) -> OTelSdkResult {
140140
Ok(())
141141
}
142142
#[cfg(feature = "spec_unstable_logs_enabled")]
143-
/// Chek if logs are enabled.
143+
/// Check if logs are enabled.
144144
fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
145145
// By default, all logs are enabled
146146
true
147147
}
148148
/// Set the resource for the exporter.
149-
fn set_resource(&mut self, _resource: &Resource) {}
149+
fn set_resource(&self, _resource: &Resource) {}
150150
}

opentelemetry-sdk/src/logs/in_memory_exporter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,14 +208,14 @@ impl LogExporter for InMemoryLogExporter {
208208
}
209209
}
210210

211-
fn shutdown(&mut self) -> OTelSdkResult {
211+
fn shutdown(&self) -> OTelSdkResult {
212212
if self.should_reset_on_shutdown {
213213
self.reset();
214214
}
215215
Ok(())
216216
}
217217

218-
fn set_resource(&mut self, resource: &Resource) {
218+
fn set_resource(&self, resource: &Resource) {
219219
let mut res_guard = self.resource.lock().expect("Resource lock poisoned");
220220
*res_guard = resource.clone();
221221
}

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 21 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -135,53 +135,31 @@ pub trait LogProcessor: Send + Sync + Debug {
135135
/// ```
136136
#[derive(Debug)]
137137
pub struct SimpleLogProcessor<T: LogExporter> {
138-
exporter: Mutex<T>,
139-
is_shutdown: AtomicBool,
138+
exporter: T,
140139
}
141140

142141
impl<T: LogExporter> SimpleLogProcessor<T> {
143142
pub(crate) fn new(exporter: T) -> Self {
144143
SimpleLogProcessor {
145-
exporter: Mutex::new(exporter),
146-
is_shutdown: AtomicBool::new(false),
144+
exporter: exporter
147145
}
148146
}
149147
}
150148

151149
impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
152150
fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
153-
// noop after shutdown
154-
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
155-
// this is a warning, as the user is trying to log after the processor has been shutdown
156-
otel_warn!(
157-
name: "SimpleLogProcessor.Emit.ProcessorShutdown",
151+
let log_tuple = &[(record as &SdkLogRecord, instrumentation)];
152+
let result = futures_executor::block_on(self.exporter.export(LogBatch::new(log_tuple)));
153+
if result.is_err() {
154+
otel_error!(
155+
name: "SimpleLogProcessor.Emit.ExportError",
156+
error = format!("{}", result.err().unwrap())
158157
);
159-
return;
160158
}
161-
162-
let result = self
163-
.exporter
164-
.lock()
165-
.map_err(|_| OTelSdkError::InternalFailure("SimpleLogProcessor mutex poison".into()))
166-
.and_then(|exporter| {
167-
let log_tuple = &[(record as &SdkLogRecord, instrumentation)];
168-
futures_executor::block_on(exporter.export(LogBatch::new(log_tuple)))
169-
});
170-
// Handle errors with specific static names
171-
match result {
172-
Err(OTelSdkError::InternalFailure(_)) => {
173-
// logging as debug as this is not a user error
174-
otel_debug!(
175-
name: "SimpleLogProcessor.Emit.MutexPoisoning",
176-
);
177-
}
178-
Err(err) => {
179-
otel_error!(
180-
name: "SimpleLogProcessor.Emit.ExportError",
181-
error = format!("{}",err)
182-
);
183-
}
184-
_ => {}
159+
else {
160+
otel_debug!(
161+
name: "SimpleLogProcessor.Emit.Success",
162+
);
185163
}
186164
}
187165

@@ -190,21 +168,11 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
190168
}
191169

192170
fn shutdown(&self) -> OTelSdkResult {
193-
self.is_shutdown
194-
.store(true, std::sync::atomic::Ordering::Relaxed);
195-
if let Ok(mut exporter) = self.exporter.lock() {
196-
exporter.shutdown()
197-
} else {
198-
Err(OTelSdkError::InternalFailure(
199-
"SimpleLogProcessor mutex poison at shutdown".into(),
200-
))
201-
}
171+
self.exporter.shutdown()
202172
}
203173

204174
fn set_resource(&self, resource: &Resource) {
205-
if let Ok(mut exporter) = self.exporter.lock() {
206-
exporter.set_resource(resource);
207-
}
175+
self.exporter.set_resource(resource);
208176
}
209177
}
210178

@@ -481,7 +449,7 @@ impl LogProcessor for BatchLogProcessor {
481449
}
482450

483451
impl BatchLogProcessor {
484-
pub(crate) fn new<E>(mut exporter: E, config: BatchConfig) -> Self
452+
pub(crate) fn new<E>(exporter: E, config: BatchConfig) -> Self
485453
where
486454
E: LogExporter + Send + Sync + 'static,
487455
{
@@ -909,11 +877,11 @@ mod tests {
909877
async { Ok(()) }
910878
}
911879

912-
fn shutdown(&mut self) -> OTelSdkResult {
880+
fn shutdown(&self) -> OTelSdkResult {
913881
Ok(())
914882
}
915883

916-
fn set_resource(&mut self, resource: &Resource) {
884+
fn set_resource(&self, resource: &Resource) {
917885
self.resource
918886
.lock()
919887
.map(|mut res_opt| {
@@ -1171,17 +1139,14 @@ mod tests {
11711139
let instrumentation: InstrumentationScope = Default::default();
11721140

11731141
processor.emit(&mut record, &instrumentation);
1174-
11751142
processor.shutdown().unwrap();
11761143

1177-
let is_shutdown = processor
1178-
.is_shutdown
1179-
.load(std::sync::atomic::Ordering::Relaxed);
1180-
assert!(is_shutdown);
1181-
1144+
// Emit after shutdown.
11821145
processor.emit(&mut record, &instrumentation);
11831146

1184-
assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
1147+
// SimpleProcessor does not do anything to check if logs
1148+
// are flowing after shutdown.
1149+
assert_eq!(2, exporter.get_emitted_logs().unwrap().len())
11851150
}
11861151

11871152
#[tokio::test(flavor = "current_thread")]

opentelemetry-sdk/src/logs/log_processor_with_async_runtime.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -325,11 +325,11 @@ mod tests {
325325
async { Ok(()) }
326326
}
327327

328-
fn shutdown(&mut self) -> OTelSdkResult {
328+
fn shutdown(&self) -> OTelSdkResult {
329329
Ok(())
330330
}
331331

332-
fn set_resource(&mut self, resource: &Resource) {
332+
fn set_resource(&self, resource: &Resource) {
333333
self.resource
334334
.lock()
335335
.map(|mut res_opt| {

opentelemetry-stdout/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,6 @@ opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["rt-tokio", "me
4040
opentelemetry-appender-tracing = { path = "../opentelemetry-appender-tracing"}
4141
opentelemetry-semantic-conventions = { path = "../opentelemetry-semantic-conventions" }
4242
tracing = { workspace = true, features = ["std"]}
43-
tracing-subscriber = { workspace = true, features = ["registry", "std"] }
43+
tracing-subscriber = { workspace = true, features = ["env-filter","registry", "std"] }
4444
tokio = { workspace = true, features = ["full"] }
4545
once_cell = { workspace = true }

opentelemetry-stdout/examples/basic.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,22 @@ fn init_metrics() -> opentelemetry_sdk::metrics::SdkMeterProvider {
4545
fn init_logs() -> opentelemetry_sdk::logs::SdkLoggerProvider {
4646
use opentelemetry_appender_tracing::layer;
4747
use opentelemetry_sdk::logs::SdkLoggerProvider;
48-
use tracing_subscriber::prelude::*;
48+
use tracing_subscriber::{prelude::*, EnvFilter};
4949

5050
let exporter = opentelemetry_stdout::LogExporter::default();
5151
let provider: SdkLoggerProvider = SdkLoggerProvider::builder()
5252
.with_simple_exporter(exporter)
5353
.with_resource(RESOURCE.clone())
5454
.build();
55+
let filter_otel = EnvFilter::new("info")
56+
.add_directive("hyper=off".parse().unwrap())
57+
.add_directive("opentelemetry=off".parse().unwrap())
58+
.add_directive("tonic=off".parse().unwrap())
59+
.add_directive("h2=off".parse().unwrap())
60+
.add_directive("reqwest=off".parse().unwrap());
5561
let layer = layer::OpenTelemetryTracingBridge::new(&provider);
56-
tracing_subscriber::registry().with(layer).init();
62+
let otel_layer = layer.with_filter(filter_otel);
63+
tracing_subscriber::registry().with(otel_layer).init();
5764
provider
5865
}
5966

opentelemetry-stdout/src/logs/exporter.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,20 @@ use core::fmt;
33
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
44
use opentelemetry_sdk::logs::LogBatch;
55
use opentelemetry_sdk::Resource;
6-
use std::sync::atomic;
6+
use std::sync::{atomic, Mutex};
77
use std::sync::atomic::Ordering;
88

99
/// An OpenTelemetry exporter that writes Logs to stdout on export.
1010
pub struct LogExporter {
11-
resource: Resource,
11+
resource: Mutex<Resource>,
1212
is_shutdown: atomic::AtomicBool,
1313
resource_emitted: atomic::AtomicBool,
1414
}
1515

1616
impl Default for LogExporter {
1717
fn default() -> Self {
1818
LogExporter {
19-
resource: Resource::builder().build(),
19+
resource: Mutex::new(Resource::builder().build()),
2020
is_shutdown: atomic::AtomicBool::new(false),
2121
resource_emitted: atomic::AtomicBool::new(false),
2222
}
@@ -49,10 +49,11 @@ impl opentelemetry_sdk::logs::LogExporter for LogExporter {
4949
print_logs(batch);
5050
} else {
5151
println!("Resource");
52-
if let Some(schema_url) = self.resource.schema_url() {
52+
let resource = self.resource.lock().unwrap();
53+
if let Some(schema_url) = resource.schema_url() {
5354
println!("\t Resource SchemaUrl: {:?}", schema_url);
5455
}
55-
self.resource.iter().for_each(|(k, v)| {
56+
resource.iter().for_each(|(k, v)| {
5657
println!("\t -> {}={:?}", k, v);
5758
});
5859
print_logs(batch);
@@ -63,13 +64,14 @@ impl opentelemetry_sdk::logs::LogExporter for LogExporter {
6364
}
6465
}
6566

66-
fn shutdown(&mut self) -> OTelSdkResult {
67+
fn shutdown(&self) -> OTelSdkResult {
6768
self.is_shutdown.store(true, atomic::Ordering::SeqCst);
6869
Ok(())
6970
}
7071

71-
fn set_resource(&mut self, res: &opentelemetry_sdk::Resource) {
72-
self.resource = res.clone();
72+
fn set_resource(&self, res: &opentelemetry_sdk::Resource) {
73+
let mut res_guard = self.resource.lock().unwrap();
74+
*res_guard = res.clone();
7375
}
7476
}
7577

0 commit comments

Comments
 (0)