Skip to content

Commit 96b7acc

Browse files
authored
Remove mut self reference from LogExporter::export() method. (#2380)
1 parent 957659f commit 96b7acc

File tree

12 files changed

+59
-29
lines changed

12 files changed

+59
-29
lines changed

opentelemetry-appender-tracing/benches/logs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ struct NoopExporter {
3434

3535
#[async_trait]
3636
impl LogExporter for NoopExporter {
37-
async fn export(&mut self, _: LogBatch<'_>) -> LogResult<()> {
37+
async fn export(&self, _: LogBatch<'_>) -> LogResult<()> {
3838
LogResult::Ok(())
3939
}
4040

opentelemetry-appender-tracing/src/layer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ mod tests {
247247

248248
#[async_trait]
249249
impl LogExporter for ReentrantLogExporter {
250-
async fn export(&mut self, _batch: LogBatch<'_>) -> LogResult<()> {
250+
async fn export(&self, _batch: LogBatch<'_>) -> LogResult<()> {
251251
// This will cause a deadlock as the export itself creates a log
252252
// while still within the lock of the SimpleLogProcessor.
253253
warn!(name: "my-event-name", target: "reentrant", event_id = 20, user_name = "otel", user_email = "[email protected]");

opentelemetry-otlp/src/exporter/http/logs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use super::OtlpHttpClient;
99

1010
#[async_trait]
1111
impl LogExporter for OtlpHttpClient {
12-
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
12+
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
1313
let client = self
1414
.client
1515
.lock()

opentelemetry-otlp/src/exporter/tonic/logs.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Chann
1010
use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
1111

1212
use super::BoxInterceptor;
13+
use tokio::sync::Mutex;
1314

1415
pub(crate) struct TonicLogsClient {
1516
inner: Option<ClientInner>,
@@ -20,7 +21,7 @@ pub(crate) struct TonicLogsClient {
2021

2122
struct ClientInner {
2223
client: LogsServiceClient<Channel>,
23-
interceptor: BoxInterceptor,
24+
interceptor: Mutex<BoxInterceptor>,
2425
}
2526

2627
impl fmt::Debug for TonicLogsClient {
@@ -45,7 +46,7 @@ impl TonicLogsClient {
4546
TonicLogsClient {
4647
inner: Some(ClientInner {
4748
client,
48-
interceptor,
49+
interceptor: Mutex::new(interceptor),
4950
}),
5051
resource: Default::default(),
5152
}
@@ -54,11 +55,13 @@ impl TonicLogsClient {
5455

5556
#[async_trait]
5657
impl LogExporter for TonicLogsClient {
57-
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
58-
let (mut client, metadata, extensions) = match &mut self.inner {
58+
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
59+
let (mut client, metadata, extensions) = match &self.inner {
5960
Some(inner) => {
6061
let (m, e, _) = inner
6162
.interceptor
63+
.lock()
64+
.await // tokio::sync::Mutex doesn't return a poisoned error, so we can safely use the interceptor here
6265
.call(Request::new(()))
6366
.map_err(|e| LogError::Other(Box::new(e)))?
6467
.into_parts();

opentelemetry-otlp/src/logs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ impl LogExporter {
124124

125125
#[async_trait]
126126
impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
127-
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
127+
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
128128
self.client.export(batch).await
129129
}
130130

opentelemetry-sdk/CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,13 @@
1111
- `ResourceDetector.detect()` no longer supports timeout option.
1212
- `opentelemetry::global::shutdown_tracer_provider()` Removed from the API, should now use `tracer_provider.shutdown()` see [#2369](https://github.com/open-telemetry/opentelemetry-rust/pull/2369) for a migration example. "Tracer provider" is cheaply cloneable, so users are encouraged to set a clone of it as the global (ex: `global::set_tracer_provider(provider.clone()))`, so that instrumentations and other components can obtain tracers from `global::tracer()`. The tracer_provider must be kept around to call shutdown on it at the end of application (ex: `tracer_provider.shutdown()`)
1313

14+
- *Breaking* The LogExporter::export() method no longer requires a mutable reference to self.:
15+
Before:
16+
async fn export(&mut self, _batch: LogBatch<'_>) -> LogResult<()>
17+
After:
18+
async fn export(&self, _batch: LogBatch<'_>) -> LogResult<()>
19+
Custom exporters will need to internally synchronize any mutable state, if applicable.
20+
1421
## 0.27.1
1522

1623
Released 2024-Nov-27

opentelemetry-sdk/src/export/logs/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ pub trait LogExporter: Send + Sync + Debug {
8181
/// A `LogResult<()>`, which is a result type indicating either a successful export (with
8282
/// `Ok(())`) or an error (`Err(LogError)`) if the export operation failed.
8383
///
84-
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()>;
84+
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()>;
8585
/// Shuts down the exporter.
8686
fn shutdown(&mut self) {}
8787
#[cfg(feature = "spec_unstable_logs_enabled")]

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
106106
.exporter
107107
.lock()
108108
.map_err(|_| LogError::MutexPoisoned("SimpleLogProcessor".into()))
109-
.and_then(|mut exporter| {
109+
.and_then(|exporter| {
110110
let log_tuple = &[(record as &LogRecord, instrumentation)];
111111
futures_executor::block_on(exporter.export(LogBatch::new(log_tuple)))
112112
});
@@ -586,7 +586,7 @@ mod tests {
586586

587587
#[async_trait]
588588
impl LogExporter for MockLogExporter {
589-
async fn export(&mut self, _batch: LogBatch<'_>) -> LogResult<()> {
589+
async fn export(&self, _batch: LogBatch<'_>) -> LogResult<()> {
590590
Ok(())
591591
}
592592

@@ -1093,7 +1093,7 @@ mod tests {
10931093

10941094
#[async_trait::async_trait]
10951095
impl LogExporter for LogExporterThatRequiresTokio {
1096-
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
1096+
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
10971097
// Simulate minimal dependency on tokio by sleeping asynchronously for a short duration
10981098
tokio::time::sleep(Duration::from_millis(50)).await;
10991099

opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ impl InMemoryLogExporter {
183183

184184
#[async_trait]
185185
impl LogExporter for InMemoryLogExporter {
186-
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
186+
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
187187
let mut logs_guard = self.logs.lock().map_err(LogError::from)?;
188188
for (log_record, instrumentation) in batch.iter() {
189189
let owned_log = OwnedLogData {

opentelemetry-stdout/src/logs/exporter.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,21 @@ use opentelemetry_sdk::export::logs::LogBatch;
55
use opentelemetry_sdk::logs::LogResult;
66
use opentelemetry_sdk::Resource;
77
use std::sync::atomic;
8+
use std::sync::atomic::Ordering;
89

910
/// An OpenTelemetry exporter that writes Logs to stdout on export.
1011
pub struct LogExporter {
1112
resource: Resource,
1213
is_shutdown: atomic::AtomicBool,
13-
resource_emitted: bool,
14+
resource_emitted: atomic::AtomicBool,
1415
}
1516

1617
impl Default for LogExporter {
1718
fn default() -> Self {
1819
LogExporter {
1920
resource: Resource::default(),
2021
is_shutdown: atomic::AtomicBool::new(false),
21-
resource_emitted: false,
22+
resource_emitted: atomic::AtomicBool::new(false),
2223
}
2324
}
2425
}
@@ -32,15 +33,18 @@ impl fmt::Debug for LogExporter {
3233
#[async_trait]
3334
impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
3435
/// Export spans to stdout
35-
async fn export(&mut self, batch: LogBatch<'_>) -> LogResult<()> {
36+
async fn export(&self, batch: LogBatch<'_>) -> LogResult<()> {
3637
if self.is_shutdown.load(atomic::Ordering::SeqCst) {
3738
return Err("exporter is shut down".into());
3839
} else {
3940
println!("Logs");
40-
if self.resource_emitted {
41+
if self
42+
.resource_emitted
43+
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
44+
.is_err()
45+
{
4146
print_logs(batch);
4247
} else {
43-
self.resource_emitted = true;
4448
println!("Resource");
4549
if let Some(schema_url) = self.resource.schema_url() {
4650
println!("\t Resource SchemaUrl: {:?}", schema_url);

0 commit comments

Comments
 (0)